Skip to content

Commit

Permalink
RUM-1983 Introduce the BatchProcessingLevel API
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusc83 committed Nov 3, 2023
1 parent d8f8c22 commit 7d8bdac
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 29 deletions.
6 changes: 6 additions & 0 deletions dd-sdk-android-core/api/apiSurface
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class com.datadog.android.core.SdkReference
constructor(String? = null, (com.datadog.android.api.SdkCore) -> Unit = {})
fun get(): com.datadog.android.api.SdkCore?
fun <T> allowThreadDiskReads(() -> T): T
enum com.datadog.android.core.configuration.BatchProcessingLevel
constructor(Int)
- LOW
- MEDIUM
- HIGH
enum com.datadog.android.core.configuration.BatchSize
constructor(Long)
- SMALL
Expand All @@ -168,6 +173,7 @@ data class com.datadog.android.core.configuration.Configuration
fun useSite(com.datadog.android.DatadogSite): Builder
fun setBatchSize(BatchSize): Builder
fun setUploadFrequency(UploadFrequency): Builder
fun setBatchProcessingLevel(BatchProcessingLevel): Builder
fun setAdditionalConfiguration(Map<String, Any>): Builder
fun setProxy(java.net.Proxy, okhttp3.Authenticator?): Builder
fun setEncryption(com.datadog.android.security.Encryption): Builder
Expand Down
10 changes: 10 additions & 0 deletions dd-sdk-android-core/api/dd-sdk-android-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,15 @@ public final class com/datadog/android/core/StrictModeExtKt {
public static final fun allowThreadDiskReads (Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
}

public final class com/datadog/android/core/configuration/BatchProcessingLevel : java/lang/Enum {
public static final field HIGH Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public static final field LOW Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public static final field MEDIUM Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public final fun getMaxBatchesPerUpload ()I
public static fun valueOf (Ljava/lang/String;)Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public static fun values ()[Lcom/datadog/android/core/configuration/BatchProcessingLevel;
}

public final class com/datadog/android/core/configuration/BatchSize : java/lang/Enum {
public static final field LARGE Lcom/datadog/android/core/configuration/BatchSize;
public static final field MEDIUM Lcom/datadog/android/core/configuration/BatchSize;
Expand All @@ -481,6 +490,7 @@ public final class com/datadog/android/core/configuration/Configuration$Builder
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun build ()Lcom/datadog/android/core/configuration/Configuration;
public final fun setAdditionalConfiguration (Ljava/util/Map;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBatchProcessingLevel (Lcom/datadog/android/core/configuration/BatchProcessingLevel;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBatchSize (Lcom/datadog/android/core/configuration/BatchSize;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setCrashReportsEnabled (Z)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setEncryption (Lcom/datadog/android/security/Encryption;)Lcom/datadog/android/core/configuration/Configuration$Builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.configuration

/**
* Defines the policy for sending the batches.
* High level will mean that more data will be sent in a single request but more CPU and memory
* will be used to process the data.
* Low level will mean that less data will be sent in a single request but less CPU and memory
* will be used to process the data.
*/
enum class BatchProcessingLevel(val maxBatchesPerUpload: Int) {
LOW(1),
MEDIUM(10),
HIGH(100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ internal constructor(
val proxy: Proxy?,
val proxyAuth: Authenticator,
val encryption: Encryption?,
val site: DatadogSite
val site: DatadogSite,
val batchProcessingLevel: BatchProcessingLevel
)

// region Builder
Expand Down Expand Up @@ -166,6 +167,11 @@ internal constructor(
return this
}

fun setBatchProcessingLevel(batchProcessingLevel: BatchProcessingLevel): Builder {
coreConfig = coreConfig.copy(batchProcessingLevel = batchProcessingLevel)
return this
}

/**
* Allows to provide additional configuration values which can be used by the SDK.
* @param additionalConfig Additional configuration values.
Expand Down Expand Up @@ -238,7 +244,8 @@ internal constructor(
proxy = null,
proxyAuth = Authenticator.NONE,
encryption = null,
site = DatadogSite.US1
site = DatadogSite.US1,
batchProcessingLevel = BatchProcessingLevel.MEDIUM
)

internal const val NETWORK_REQUESTS_TRACKING_FEATURE_NAME = "Network requests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.datadog.android.BuildConfig
import com.datadog.android.DatadogSite
import com.datadog.android.api.InternalLogger
import com.datadog.android.core.allowThreadDiskReads
import com.datadog.android.core.configuration.BatchProcessingLevel
import com.datadog.android.core.configuration.BatchSize
import com.datadog.android.core.configuration.Configuration
import com.datadog.android.core.configuration.UploadFrequency
Expand Down Expand Up @@ -129,6 +130,7 @@ internal class CoreFeature(
internal var variant: String = ""
internal var batchSize: BatchSize = BatchSize.MEDIUM
internal var uploadFrequency: UploadFrequency = UploadFrequency.AVERAGE
internal var batchProcessingLevel: BatchProcessingLevel = BatchProcessingLevel.MEDIUM
internal var ndkCrashHandler: NdkCrashHandler = NoOpNdkCrashHandler()
internal var site: DatadogSite = DatadogSite.US1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ internal class SdkFeature(
var dataUploadConfiguration: DataUploadConfiguration? = null
if (wrappedFeature is StorageBackedFeature) {
val uploadFrequency = resolveUploadFrequency()
dataUploadConfiguration = DataUploadConfiguration(uploadFrequency)
dataUploadConfiguration = DataUploadConfiguration(
uploadFrequency,
coreFeature.batchProcessingLevel.maxBatchesPerUpload
)
val storageConfiguration = wrappedFeature.storageConfiguration
val recentDelayMs = resolveBatchingDelay(coreFeature, storageConfiguration)
val filePersistenceConfig = coreFeature.buildFilePersistenceConfig().copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ package com.datadog.android.core.internal.configuration

import com.datadog.android.core.configuration.UploadFrequency

internal data class DataUploadConfiguration(internal val frequency: UploadFrequency) {
internal data class DataUploadConfiguration(
internal val frequency: UploadFrequency,
internal val maxBatchesPerUpload: Int
) {
internal val minDelayMs = MIN_DELAY_FACTOR * frequency.baseStepMs
internal val maxDelayMs = MAX_DELAY_FACTOR * frequency.baseStepMs
internal val defaultDelayMs = DEFAULT_DELAY_FACTOR * frequency.baseStepMs

companion object {
internal const val MIN_DELAY_FACTOR = 1
internal const val MAX_DELAY_FACTOR = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ internal class DataUploadRunnable(
internal var currentDelayIntervalMs = uploadConfiguration.defaultDelayMs
internal val minDelayMs = uploadConfiguration.minDelayMs
internal val maxDelayMs = uploadConfiguration.maxDelayMs
private val maxBatchesPerAsyncJob = uploadConfiguration.maxBatchesPerUpload

// region Runnable

Expand All @@ -54,28 +55,13 @@ internal class DataUploadRunnable(
val context = contextProvider.context
// TODO RUMM-0000 it should be already on the worker thread and if readNextBatch is async,
// we should wait until it completes before scheduling further
val lock = CountDownLatch(1)
storage.readNextBatch(
noBatchCallback = {
increaseInterval()
lock.countDown()
}
) { batchId, reader ->
try {
val batch = reader.read()
val batchMeta = reader.currentMetadata()

consumeBatch(
context,
batchId,
batch,
batchMeta
)
} finally {
lock.countDown()
}
}
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS)
var batchConsumerCoins = maxBatchesPerAsyncJob
var lastBatchUploadStatus: UploadStatus
do {
batchConsumerCoins--
lastBatchUploadStatus = handleBatchUpload(context)
} while (batchConsumerCoins > 0 && lastBatchUploadStatus is UploadStatus.Success)
handleBatchConsumingJobFrequency(lastBatchUploadStatus)
}

scheduleNextUpload()
Expand All @@ -85,6 +71,40 @@ internal class DataUploadRunnable(

// region Internal

private fun handleBatchConsumingJobFrequency(lastBatchUploadStatus: UploadStatus) {
if (lastBatchUploadStatus is UploadStatus.Success) {
decreaseInterval()
} else {
increaseInterval()
}
}

private fun handleBatchUpload(context: DatadogContext): UploadStatus {
var uploadStatus: UploadStatus = UploadStatus.UnknownStatus
val lock = CountDownLatch(1)
storage.readNextBatch(
noBatchCallback = {
lock.countDown()
}
) { batchId, reader ->
try {
val batch = reader.read()
val batchMeta = reader.currentMetadata()

uploadStatus = consumeBatch(
context,
batchId,
batch,
batchMeta
)
} finally {
lock.countDown()
}
}
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS)
return uploadStatus
}

private fun isNetworkAvailable(): Boolean {
val networkInfo = networkInfoProvider.getLatestNetworkInfo()
return networkInfo.connectivity != NetworkInfo.Connectivity.NETWORK_NOT_CONNECTED
Expand Down Expand Up @@ -115,7 +135,7 @@ internal class DataUploadRunnable(
batchId: BatchId,
batch: List<RawBatchEvent>,
batchMeta: ByteArray?
) {
): UploadStatus {
val status = dataUploader.upload(context, batch, batchMeta)
val removalReason = if (status is UploadStatus.RequestCreationError) {
RemovalReason.Invalid
Expand All @@ -125,12 +145,11 @@ internal class DataUploadRunnable(
storage.confirmBatchRead(batchId, removalReason) {
if (status.shouldRetry) {
it.markAsRead(false)
increaseInterval()
} else {
it.markAsRead(true)
decreaseInterval()
}
}
return status
}

@Suppress("UnsafeThirdPartyFunctionCall") // rounded Double isn't NaN
Expand Down

0 comments on commit 7d8bdac

Please sign in to comment.