From 5567856efd8e7a0f416ef912f9f01f53214cd9bb Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 13 Jun 2022 10:19:23 -0700 Subject: [PATCH 1/3] Refactor schema validation for better reading. --- .../general/DefaultReplicationWorker.java | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index bc1d3e14e47db..a8be8683c9764 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -306,31 +306,11 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, } catch (final Exception e) { throw new SourceException("Source process read attempt failed", e); } - if (messageOptional.isPresent()) { - if (messageOptional.get().getRecord() != null) { - final AirbyteRecordMessage message = messageOptional.get().getRecord(); - final String messageStream = WorkerUtils.streamNameWithNamespace(message.getNamespace(), message.getStream()); - // validate a record's schema if there are less than 10 records with validation errors - if (validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10) { - try { - recordSchemaValidator.validateSchema(messageOptional.get().getRecord(), messageStream); - } catch (final RecordSchemaValidationException e) { - final ImmutablePair, Integer> exceptionWithCount = validationErrors.get(messageStream); - if (exceptionWithCount == null) { - validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1)); - } else { - final Integer currentCount = exceptionWithCount.getRight(); - final Set currentErrorMessages = exceptionWithCount.getLeft(); - final Set updatedErrorMessages = - Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet()); - validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1)); - } - } - - } - } - final AirbyteMessage message = mapper.mapMessage(messageOptional.get()); + if (messageOptional.isPresent()) { + final AirbyteMessage airbyteMessage = messageOptional.get(); + validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); + final AirbyteMessage message = mapper.mapMessage(airbyteMessage); messageTracker.acceptFromSource(message); try { @@ -386,6 +366,34 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, }; } + private static void validateSchema(RecordSchemaValidator recordSchemaValidator, + Map, Integer>> validationErrors, + AirbyteMessage message) { + if (message.getRecord() == null) { + return; + } + + final AirbyteRecordMessage record = message.getRecord(); + final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream()); + // validate a record's schema if there are less than 10 records with validation errors + if (validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10) { + try { + recordSchemaValidator.validateSchema(record, messageStream); + } catch (final RecordSchemaValidationException e) { + final ImmutablePair, Integer> exceptionWithCount = validationErrors.get(messageStream); + if (exceptionWithCount == null) { + validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1)); + } else { + final Integer currentCount = exceptionWithCount.getRight(); + final Set currentErrorMessages = exceptionWithCount.getLeft(); + final Set updatedErrorMessages = Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet()); + validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1)); + } + } + + } + } + private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination, final AtomicBoolean cancelled, final MessageTracker messageTracker, From fdb3b4727252d90bfd80bff142737b50877e4ecb Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 13 Jun 2022 10:22:32 -0700 Subject: [PATCH 2/3] Better comments. --- .../airbyte/workers/general/DefaultReplicationWorker.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index a8be8683c9764..b27af3499c23d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -375,8 +375,10 @@ private static void validateSchema(RecordSchemaValidator recordSchemaValidator, final AirbyteRecordMessage record = message.getRecord(); final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream()); - // validate a record's schema if there are less than 10 records with validation errors - if (validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10) { + // avoid noise by validating only if the stream has less than 10 records with validation errors + final boolean streamHasLessThenTenErrs = + validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10; + if (streamHasLessThenTenErrs) { try { recordSchemaValidator.validateSchema(record, messageStream); } catch (final RecordSchemaValidationException e) { From 306638b9e9a93d2f8113b00fc3828e7e103af9c2 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 13 Jun 2022 10:43:32 -0700 Subject: [PATCH 3/3] Format. --- .../io/airbyte/workers/general/DefaultReplicationWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index b27af3499c23d..4a45bf15de11e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -377,7 +377,7 @@ private static void validateSchema(RecordSchemaValidator recordSchemaValidator, final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream()); // avoid noise by validating only if the stream has less than 10 records with validation errors final boolean streamHasLessThenTenErrs = - validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10; + validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10; if (streamHasLessThenTenErrs) { try { recordSchemaValidator.validateSchema(record, messageStream);