Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUM-1983 Introduce the BatchProcessingLevel API #1686

Conversation

mariusc83
Copy link
Member

@mariusc83 mariusc83 commented Nov 2, 2023

What does this PR do?

Modifies the core logic to get a list of batches instead of single batch. It's done through a new configuration called BatchProcessingLevel that allows controlling the amount of batches processed sequentially without a delay within one reading/uploading cycle. Currently it exposed 3 levels: low, medium and high that translate to 1, 10 and 100 of batches processed. By default it's taking up to 10 batches in a cycle.

This logic improves the data upload when batch back pressure occurs.

Motivation

What inspired you to submit this pull request?

Additional Notes

Anything else we should know when reviewing?

Review checklist (to be filled by reviewers)

  • Feature or bugfix MUST have appropriate tests (unit, integration, e2e)
  • Make sure you discussed the feature or bugfix with the maintaining team in an Issue
  • Make sure each commit and the PR mention the Issue number (cf the CONTRIBUTING doc)

@mariusc83 mariusc83 self-assigned this Nov 2, 2023
lock.countDown()
var batchesSent = 0
while (batchesSent < maxBatchesPersUpload) {
val lock = CountDownLatch(1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: extract this in a method

@mariusc83 mariusc83 force-pushed the mconstantin/rum-1983/introduce-the-batch-processing-level-api branch 2 times, most recently from bd18a0e to b63f229 Compare November 2, 2023 13:00
noBatchCallback = {
increaseInterval()
lock.countDown()
batchesSent = maxBatchesPersUpload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is misleading and not actually true, the line with lastUploadWasSuccess below should be enough. Btw, is it possible to just use break here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes a break would have been nice, unfortunately the compiler complains about using a break outside of the function/class :(. I wonder if there is any workaround for this in Kotlin, I could not find any except using a flag.

batchMeta
)
if (status !is UploadStatus.Success) {
batchesSent = maxBatchesPersUpload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not quite true, why relying only on lastUploadWasSuccess is not enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes was a leftover, sorry for the trouble ;)

}
} finally {
batchesSent++
lastUploadWasSuccess = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this to try reading next batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes...do not mind the logic there it is a bit broken that flag should be marked as true only if the status was ok. I just wanted to get a review on the approach, sorry for the inconvenience.

val batch = reader.read()
val batchMeta = reader.currentMetadata()

val status = consumeBatch(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we ok that each call to consumeBatch will decrease scheduling interval in case of upload success? Should we decrease interval only when the whole queue is processed?

@mariusc83 mariusc83 force-pushed the mconstantin/rum-1983/introduce-the-batch-processing-level-api branch 3 times, most recently from 62cce16 to 7d8bdac Compare November 3, 2023 13:00
}
}
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS)
var batchConsumerCoins = maxBatchesPerAsyncJob
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are batchConsumerCoins? Does this mean remaining batch upload attempts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes...need to find a better name there

@@ -85,6 +71,40 @@ internal class DataUploadRunnable(

// region Internal

private fun handleBatchConsumingJobFrequency(lastBatchUploadStatus: UploadStatus) {
if (lastBatchUploadStatus is UploadStatus.Success) {
decreaseInterval()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we decrease and increase at a constant rate of X%, doesn't this expose us to the thundering herd problem? Should we add some sort of random seed here?

Also, there might be a potential issue here. Let's say a client can send X batches in a time period, but X+1 batches in that period will cause a network issue for that user. The user sends X, receives success and scales up, then sends X+1, receives failure so scales down, then X and scales up, then X+1 and scales down - in a loop. About half of all network requests will therefore fail causing wasted network for the client. Perhaps we could optimize this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's part of a different task when we will apply the things discussed in the Retry strategy RFC that Ganesh wrote.

regarding second question, we are only increasing/decreasing after a loop cycle not after a request in that loop. I am not sure I get your question in this case, maybe a zoom discussion on this if you want ?

@mariusc83 mariusc83 force-pushed the mconstantin/rum-1983/introduce-the-batch-processing-level-api branch 4 times, most recently from 31684b5 to 95e89ef Compare November 6, 2023 13:52
@mariusc83 mariusc83 marked this pull request as ready for review November 6, 2023 13:52
@mariusc83 mariusc83 requested a review from a team as a code owner November 6, 2023 13:52
@mariusc83 mariusc83 force-pushed the mconstantin/rum-1983/introduce-the-batch-processing-level-api branch 2 times, most recently from 13ef4e5 to 9678379 Compare November 6, 2023 15:00
@@ -125,12 +153,11 @@ internal class DataUploadRunnable(
storage.confirmBatchRead(batchId, removalReason) {
if (status.shouldRetry) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: then the whole lambda can be simplified as it.markAsRead(deleteBatch = !status.shouldRetry)

@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit
ExtendWith(ForgeExtension::class)
)
@MockitoSettings(strictness = Strictness.LENIENT)
@ForgeConfiguration(Configurator::class)
@ForgeConfiguration(Configurator::class, seed = 0xf4c7ed013062L)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debugging leftover? :)

Comment on lines 99 to 105
// to make sure the basic tests are not flaky
// the cases where the maxBatchesPerUploadJob is variable are tested in dedicated test
// methods
fakeDataUploadConfiguration = fakeDataUploadConfiguration.copy(maxBatchesPerUploadJob = 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where the flakiness comes from? In fact it seems we are always testing the case where maxBatchesPerUploadJob = = 1, because I don't see any place where it would have a different value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's a mistake on my end I will update those tests where it should have a different value. I wanted to make sure that the basics tests that are testing the increase/decrease interval are only based on a single value to avoid problems and complexity. I will add a better explanation

@mariusc83 mariusc83 force-pushed the mconstantin/rum-1983/introduce-the-batch-processing-level-api branch 3 times, most recently from 56c252c to 26eead2 Compare November 7, 2023 13:54
@@ -14,6 +14,7 @@ import fr.xgouchet.elmyr.ForgeryFactory
internal class DataUploadConfigurationForgeryFactory : ForgeryFactory<DataUploadConfiguration> {
override fun getForgery(forge: Forge): DataUploadConfiguration {
val frequency: UploadFrequency = forge.getForgery()
return DataUploadConfiguration(frequency)
// we limit the size to avoid OOM errors inside our tests
return DataUploadConfiguration(frequency, forge.anInt(min = 1, max = 50))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the upper limit should be at least 100, to make sure that everything is working fine with HIGH value. Maybe we can use LOW for the min and HIGH for the max here? Otherwise what is the guarantee that it won't be OOM when using HIGH is production?

Also I'm curious where OOM comes from.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the maxBatchesPerUpload gives the size of the batches list in some of our tests. That number can go up to Int.MAX and it creates problems ...at least I had those on my end. I guess a rule of thumb would be to try to stick with low size numbers when using lists of objects in our tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but since we have a definition HIGH = 100, we need to set at least 100 as max to make sure things work for HIGH as well (although the allocation pattern would eventually be different in unit-tests compared to the on-device run, it is still better to align imo).

if (lastBatchUploadStatus != null) {
handleBatchConsumingJobFrequency(lastBatchUploadStatus)
} else {
// there was no batch left in the storage so we increase the interval
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessarily it means that there is no batch left, it can be a problem with a batch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so do you think we should not increase the interval or not add the comment ? For me it makes sense to increase the interval

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this comment should be updated, currently it may be misleading.

do {
batchConsumerAvailableAttempts--
lastBatchUploadStatus = handleNextBatch(context)
} while (batchConsumerAvailableAttempts > 0 &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upload logic question: if there was a problem with a particular batch and we know that it was deleted (it wasn't retryable error), cannot we continue the queue (because obviously we won't see this batch again)? Or this change won't give us much benefit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will add more complexity ...I would rather wait and see how this will behave in telemetry and in the case we are going to see a problem add this in place. I am afraid that we will have to change the way we handle our interval after as if we have 2 failures in one loop ...which one to take into account for interval decrease/increase ? The last one ?

mockThreadPoolExecutor,
times(
5 *
fakeDataUploadConfiguration.maxBatchesPerUploadJob
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fakeDataUploadConfiguration.maxBatchesPerUploadJob
expectedBatchesHandled

@Forgery batch: List<RawBatchEvent>,
@StringForgery batchMeta: String,
forge: Forge
@IntForgery(1, 10) runCount: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we reduce it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not se the point to have it that high ? Have in mind that with the maxBatchesProcessingLevel now we are increasing the number of iterations so the runCount could be smaller. Any particular reason you wanted it that high ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I was just curious what was the reason for this change

}

@Test
fun `𝕄 exhaust the available batches W run {maxBatchesPerJob higher availableBatches}`(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like higher or equal according to the test

@codecov-commenter
Copy link

codecov-commenter commented Nov 9, 2023

Codecov Report

Merging #1686 (7dbfa0a) into develop (fd22c1f) will decrease coverage by 0.06%.
Report is 6 commits behind head on develop.
The diff coverage is 92.86%.

@@             Coverage Diff             @@
##           develop    #1686      +/-   ##
===========================================
- Coverage    83.73%   83.67%   -0.06%     
===========================================
  Files          462      463       +1     
  Lines        15855    15886      +31     
  Branches      2365     2370       +5     
===========================================
+ Hits         13275    13292      +17     
- Misses        1939     1952      +13     
- Partials       641      642       +1     
Files Coverage Δ
...android/api/storage/FeatureStorageConfiguration.kt 100.00% <100.00%> (ø)
...android/core/configuration/BatchProcessingLevel.kt 100.00% <100.00%> (ø)
...n/com/datadog/android/core/internal/CoreFeature.kt 87.83% <100.00%> (+0.05%) ⬆️
.../internal/configuration/DataUploadConfiguration.kt 100.00% <100.00%> (ø)
...core/internal/data/upload/v2/DataUploadRunnable.kt 97.14% <100.00%> (+0.48%) ⬆️
...atadog/android/core/configuration/Configuration.kt 93.98% <66.67%> (-2.23%) ⬇️
...in/com/datadog/android/core/internal/SdkFeature.kt 88.15% <75.00%> (-0.91%) ⬇️

... and 18 files with indirect coverage changes

@@ -20,14 +21,17 @@ import com.datadog.android.core.configuration.UploadFrequency
* value will be taken from core configuration.
* @property batchSize the desired batch size policy.If not explicitly provided this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @property batchSize the desired batch size policy.If not explicitly provided this
* @property batchSize the desired batch size policy. If not explicitly provided this

@mariusc83 mariusc83 force-pushed the mconstantin/rum-1983/introduce-the-batch-processing-level-api branch 2 times, most recently from 523d10a to 612a23f Compare November 9, 2023 11:53
 RUM-1983 Introduce the BatchProcessingLevel API
RUM-1983 Introduce the BatchProcessingLevel API
@mariusc83 mariusc83 force-pushed the mconstantin/rum-1983/introduce-the-batch-processing-level-api branch from 612a23f to 7dbfa0a Compare November 9, 2023 11:54
@mariusc83 mariusc83 requested a review from 0xnm November 9, 2023 12:56
@mariusc83 mariusc83 merged commit b6112d9 into develop Nov 9, 2023
@mariusc83 mariusc83 deleted the mconstantin/rum-1983/introduce-the-batch-processing-level-api branch November 9, 2023 14:04
@xgouchet xgouchet added this to the 2.3.0 milestone Dec 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants