-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Only run normalization when needed #16794
Conversation
9a6df23
to
c415820
Compare
ddedce3
to
3976351
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely need to change the Optional.get() == null
check, I believe everything else is a recommendation.
if (origin != null) { | ||
return origin.value(); | ||
} else { | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this method is preexisting, but it feels unnecessary. The FailureOrigin
is provided by the caller and this simply calls a method on the provided class?
Additionally this method is only called once:
for (final FailureReason reason : failureSummary.getFailures()) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin())));
}
Seems that we could move this functionality to this^ spot instead of having a dependency on MetricTags
. Unless there is something I am missing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was from when I was testing and running into odd errors - I'm not sure this actually ever happens and I can remove this since it's unrelated anyway!
|
||
import java.util.Optional; | ||
|
||
public class AttemptNormalizationStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be replaced with a record class
?
Should normalizationFailed
be a boolean
stead, forcing a true/false to be provided?
@Inject | ||
private JobPersistence jobPersistence; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per both my preference and Micronaut's preference
Field injection makes it harder to understand a class’s requirements, making it easier to get a NullPointerException when testing a class using Field Injection. We recommend you use Constructor Injection.
could we change this to be constructor injected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the @AllArgsConstructor
will create a constructor that wins over the field injection. This is probably working because we don't have any qualifiers or configuration properties here. However, per @colesnodgrass's request, we should use explicit constructors here and avoid Lombok/field injection to make the code easier to read.
|
||
@Override | ||
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition") | ||
public Boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional<Long> numCommittedRecords) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return boolean
instead? It appears we only return true/false and never null.
// if normalization failed on past attempt, | ||
// add number of records committed on that attempt to | ||
// total committed number | ||
if (n.getNormalizationFailed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If getNormalizationFailed
returned a boolean
instead of a Boolean
, this if statement could be removed, as the above if statement already verified it was false.
} | ||
|
||
final List<AttemptNormalizationStatus> attemptNormalizationStatuses = jobPersistence.getAttemptNormalizationStatusesForJob(jobId); | ||
final AtomicReference<Long> totalRecordsCommitted = new AtomicReference<>(0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be an AtomicLong
|
||
final List<AttemptNormalizationStatus> attemptNormalizationStatuses = jobPersistence.getAttemptNormalizationStatusesForJob(jobId); | ||
final AtomicReference<Long> totalRecordsCommitted = new AtomicReference<>(0L); | ||
final AtomicReference<Boolean> shouldReturnTrue = new AtomicReference<>(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be an AtomicBoolean
if (n.getRecordsCommitted().get() == null) { | ||
shouldReturnTrue.set(true); | ||
return; | ||
} else if (n.getRecordsCommitted().get() != 0L) { | ||
totalRecordsCommitted.set(totalRecordsCommitted.get() + n.getRecordsCommitted().get()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling Optional.get() == null
doesn't work as Optional.get()
throws an NPE if the optional is empty.
Maybe replace this with Optional.ifPresentOrElse
?
n.getRecordsComitted.ifPresentOrElse(
// can use addAndGet if using an AtomicLong
numRecords -> totalRecordsComitted.addAndGet(numRecords),
() -> shouldReturnTrue.set(true)
);
// were committed records and run normalization | ||
if (n.recordsCommitted().isEmpty()) { | ||
shouldReturnTrue.set(true); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this return is not needed here as pointed out by my IDE
return
is unnecessary as the last statement in avoid
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the return is to beak out of the iteration block instead of returning a value from the method overall
shouldReturnTrue.set(true); | ||
return; | ||
} else if (n.recordsCommitted().get() != 0L) { | ||
totalRecordsCommitted.set(totalRecordsCommitted.get() + n.recordsCommitted().get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.set
should be replaced with .addAndGet(n.recordsCommitted().get())
as this will avoids the totalRecordsCommitted.get()
call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one recommended change from the set
call to addAndGet
, otherwise 👍🏻
|
||
private final Optional<JobPersistence> jobPersistence; | ||
|
||
public NormalizationSummaryCheckActivityImpl(final Optional<JobPersistence> jobPersistence) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for the optional JobPersistence?
If it's related to the data plane worker, feels like we may want to have a follow up to be able to use API instead of direct DB access here, otherwise multi-cloud will run into this issue again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's exactly the reason - i agree, didn't realize while I was writing this but changing this to use the API instead later makes sense. It's not hugely crucial in this case since all that will happen is that normalization will run when it doesn't need to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the end, this PR is still a step forward. I think this also highlights the fact that we need to focus on removing direct DB access from the workers.
cc @jdpgrailsdev in case you were keeping track of the many reasons we should be doing this ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just want to double check on how we skip normalization, it might be a bit aggressive but I am not that familiar with the following steps.
|
||
// if the count of committed records for this attempt is > 0 OR if it is null, | ||
// then we should run normalization | ||
if (numCommittedRecords.get() == null || numCommittedRecords.get() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the null check should be numCommittedRecords.isEmpty()
or .isPresent()
} | ||
if (!shouldRun) { | ||
LOGGER.info("Skipping normalization because there are no records to normalize."); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure we want to break in this case? I think there may be other steps after normalization, like dbt for example.
Only run normalization when records have been committed
Only run normalization when records have been committed
Resolves #9672
Only run normalization if there are records that have been committed that have not yet been normalized