-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUM-1983 Introduce the BatchProcessingLevel API #1686
RUM-1983 Introduce the BatchProcessingLevel API #1686
Conversation
lock.countDown() | ||
var batchesSent = 0 | ||
while (batchesSent < maxBatchesPersUpload) { | ||
val lock = CountDownLatch(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: extract this in a method
bd18a0e
to
b63f229
Compare
noBatchCallback = { | ||
increaseInterval() | ||
lock.countDown() | ||
batchesSent = maxBatchesPersUpload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line is misleading and not actually true, the line with lastUploadWasSuccess
below should be enough. Btw, is it possible to just use break
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes a break would have been nice, unfortunately the compiler complains about using a break outside of the function/class :(. I wonder if there is any workaround for this in Kotlin, I could not find any except using a flag.
batchMeta | ||
) | ||
if (status !is UploadStatus.Success) { | ||
batchesSent = maxBatchesPersUpload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not quite true, why relying only on lastUploadWasSuccess
is not enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes was a leftover, sorry for the trouble ;)
} | ||
} finally { | ||
batchesSent++ | ||
lastUploadWasSuccess = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this to try reading next batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes...do not mind the logic there it is a bit broken that flag should be marked as true only if the status was ok. I just wanted to get a review on the approach, sorry for the inconvenience.
val batch = reader.read() | ||
val batchMeta = reader.currentMetadata() | ||
|
||
val status = consumeBatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we ok that each call to consumeBatch
will decrease scheduling interval in case of upload success? Should we decrease interval only when the whole queue is processed?
...-core/src/main/kotlin/com/datadog/android/core/internal/data/upload/v2/DataUploadRunnable.kt
Outdated
Show resolved
Hide resolved
62cce16
to
7d8bdac
Compare
} | ||
} | ||
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS) | ||
var batchConsumerCoins = maxBatchesPerAsyncJob |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are batchConsumerCoins? Does this mean remaining batch upload attempts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes...need to find a better name there
@@ -85,6 +71,40 @@ internal class DataUploadRunnable( | |||
|
|||
// region Internal | |||
|
|||
private fun handleBatchConsumingJobFrequency(lastBatchUploadStatus: UploadStatus) { | |||
if (lastBatchUploadStatus is UploadStatus.Success) { | |||
decreaseInterval() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we decrease and increase at a constant rate of X%, doesn't this expose us to the thundering herd problem? Should we add some sort of random seed here?
Also, there might be a potential issue here. Let's say a client can send X batches in a time period, but X+1 batches in that period will cause a network issue for that user. The user sends X, receives success and scales up, then sends X+1, receives failure so scales down, then X and scales up, then X+1 and scales down - in a loop. About half of all network requests will therefore fail causing wasted network for the client. Perhaps we could optimize this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's part of a different task when we will apply the things discussed in the Retry strategy RFC that Ganesh wrote.
regarding second question, we are only increasing/decreasing after a loop cycle not after a request in that loop. I am not sure I get your question in this case, maybe a zoom discussion on this if you want ?
31684b5
to
95e89ef
Compare
13ef4e5
to
9678379
Compare
@@ -125,12 +153,11 @@ internal class DataUploadRunnable( | |||
storage.confirmBatchRead(batchId, removalReason) { | |||
if (status.shouldRetry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: then the whole lambda can be simplified as it.markAsRead(deleteBatch = !status.shouldRetry)
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit | |||
ExtendWith(ForgeExtension::class) | |||
) | |||
@MockitoSettings(strictness = Strictness.LENIENT) | |||
@ForgeConfiguration(Configurator::class) | |||
@ForgeConfiguration(Configurator::class, seed = 0xf4c7ed013062L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debugging leftover? :)
// to make sure the basic tests are not flaky | ||
// the cases where the maxBatchesPerUploadJob is variable are tested in dedicated test | ||
// methods | ||
fakeDataUploadConfiguration = fakeDataUploadConfiguration.copy(maxBatchesPerUploadJob = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where the flakiness comes from? In fact it seems we are always testing the case where maxBatchesPerUploadJob = = 1
, because I don't see any place where it would have a different value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it's a mistake on my end I will update those tests where it should have a different value. I wanted to make sure that the basics tests that are testing the increase/decrease interval are only based on a single value to avoid problems and complexity. I will add a better explanation
56c252c
to
26eead2
Compare
@@ -14,6 +14,7 @@ import fr.xgouchet.elmyr.ForgeryFactory | |||
internal class DataUploadConfigurationForgeryFactory : ForgeryFactory<DataUploadConfiguration> { | |||
override fun getForgery(forge: Forge): DataUploadConfiguration { | |||
val frequency: UploadFrequency = forge.getForgery() | |||
return DataUploadConfiguration(frequency) | |||
// we limit the size to avoid OOM errors inside our tests | |||
return DataUploadConfiguration(frequency, forge.anInt(min = 1, max = 50)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the upper limit should be at least 100, to make sure that everything is working fine with HIGH
value. Maybe we can use LOW
for the min and HIGH
for the max here? Otherwise what is the guarantee that it won't be OOM when using HIGH
is production?
Also I'm curious where OOM comes from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the maxBatchesPerUpload
gives the size of the batches list in some of our tests. That number can go up to Int.MAX
and it creates problems ...at least I had those on my end. I guess a rule of thumb would be to try to stick with low size numbers when using lists of objects in our tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but since we have a definition HIGH = 100
, we need to set at least 100 as max to make sure things work for HIGH
as well (although the allocation pattern would eventually be different in unit-tests compared to the on-device run, it is still better to align imo).
if (lastBatchUploadStatus != null) { | ||
handleBatchConsumingJobFrequency(lastBatchUploadStatus) | ||
} else { | ||
// there was no batch left in the storage so we increase the interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not necessarily it means that there is no batch left, it can be a problem with a batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so do you think we should not increase the interval or not add the comment ? For me it makes sense to increase the interval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean this comment should be updated, currently it may be misleading.
do { | ||
batchConsumerAvailableAttempts-- | ||
lastBatchUploadStatus = handleNextBatch(context) | ||
} while (batchConsumerAvailableAttempts > 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upload logic question: if there was a problem with a particular batch and we know that it was deleted (it wasn't retryable error), cannot we continue the queue (because obviously we won't see this batch again)? Or this change won't give us much benefit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will add more complexity ...I would rather wait and see how this will behave in telemetry and in the case we are going to see a problem add this in place. I am afraid that we will have to change the way we handle our interval after as if we have 2 failures in one loop ...which one to take into account for interval decrease/increase ? The last one ?
mockThreadPoolExecutor, | ||
times( | ||
5 * | ||
fakeDataUploadConfiguration.maxBatchesPerUploadJob |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fakeDataUploadConfiguration.maxBatchesPerUploadJob | |
expectedBatchesHandled |
@Forgery batch: List<RawBatchEvent>, | ||
@StringForgery batchMeta: String, | ||
forge: Forge | ||
@IntForgery(1, 10) runCount: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we reduce it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not se the point to have it that high ? Have in mind that with the maxBatchesProcessingLevel now we are increasing the number of iterations so the runCount
could be smaller. Any particular reason you wanted it that high ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I was just curious what was the reason for this change
} | ||
|
||
@Test | ||
fun `𝕄 exhaust the available batches W run {maxBatchesPerJob higher availableBatches}`( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like higher or equal
according to the test
Codecov Report
@@ Coverage Diff @@
## develop #1686 +/- ##
===========================================
- Coverage 83.73% 83.67% -0.06%
===========================================
Files 462 463 +1
Lines 15855 15886 +31
Branches 2365 2370 +5
===========================================
+ Hits 13275 13292 +17
- Misses 1939 1952 +13
- Partials 641 642 +1
|
@@ -20,14 +21,17 @@ import com.datadog.android.core.configuration.UploadFrequency | |||
* value will be taken from core configuration. | |||
* @property batchSize the desired batch size policy.If not explicitly provided this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @property batchSize the desired batch size policy.If not explicitly provided this | |
* @property batchSize the desired batch size policy. If not explicitly provided this |
523d10a
to
612a23f
Compare
RUM-1983 Introduce the BatchProcessingLevel API RUM-1983 Introduce the BatchProcessingLevel API
612a23f
to
7dbfa0a
Compare
What does this PR do?
Modifies the core logic to get a list of batches instead of single batch. It's done through a new configuration called BatchProcessingLevel that allows controlling the amount of batches processed sequentially without a delay within one reading/uploading cycle. Currently it exposed 3 levels: low, medium and high that translate to 1, 10 and 100 of batches processed. By default it's taking up to 10 batches in a cycle.
This logic improves the data upload when batch back pressure occurs.
Motivation
What inspired you to submit this pull request?
Additional Notes
Anything else we should know when reviewing?
Review checklist (to be filled by reviewers)