Skip to content

Commit

Permalink
[Dataflow Streaming] Make SideInputCache bytes and expiry configurable (
Browse files Browse the repository at this point in the history
apache#29871)

Co-authored-by: Arun Pandian <pandiana@google.com>
  • Loading branch information
arunpandianp and arunpandianp authored Jan 9, 2024
1 parent 094eb7d commit b666c64
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,27 @@ public Dataflow create(PipelineOptions options) {
/**
* The size of the worker's in-memory cache, in megabytes.
*
* <p>Currently, this cache is used for storing read values of side inputs. as well as the state
* for streaming jobs.
* <p>Currently, this cache is used for storing read values of side inputs in batch as well as the
* user state for streaming jobs.
*/
@Description("The size of the worker's in-memory cache, in megabytes.")
@Default.Integer(100)
Integer getWorkerCacheMb();

void setWorkerCacheMb(Integer value);

@Description("The size of the streaming worker's side input cache, in megabytes.")
@Default.Integer(100)
Integer getStreamingSideInputCacheMb();

void setStreamingSideInputCacheMb(Integer value);

@Description("The expiry for streaming worker's side input cache entries, in milliseconds.")
@Default.Integer(60 * 1000) // 1 minute
Integer getStreamingSideInputCacheExpirationMillis();

void setstreamingSideInputCacheExpirationMillis(Integer value);

/**
* The amount of time before UnboundedReaders are considered idle and closed during streaming
* execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ public void run() {
this.metricTrackingWindmillServer =
new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, windmillServiceEnabled);
this.metricTrackingWindmillServer.start();
this.sideInputStateFetcher = new SideInputStateFetcher(metricTrackingWindmillServer);
this.sideInputStateFetcher = new SideInputStateFetcher(metricTrackingWindmillServer, options);
this.clientId = clientIdGenerator.nextLong();

for (MapTask mapTask : mapTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand All @@ -40,20 +41,20 @@
@CheckReturnValue
final class SideInputCache {

private static final long MAXIMUM_CACHE_WEIGHT = 100000000; /* 100 MB */
private static final long CACHE_ENTRY_EXPIRY_MINUTES = 1L;
private static final long BYTES_PER_MB = 1024 * 1024;

private final Cache<Key<?>, SideInput<?>> sideInputCache;

SideInputCache(Cache<Key<?>, SideInput<?>> sideInputCache) {
this.sideInputCache = sideInputCache;
}

static SideInputCache create() {
static SideInputCache create(DataflowPipelineDebugOptions options) {
return new SideInputCache(
CacheBuilder.newBuilder()
.maximumWeight(MAXIMUM_CACHE_WEIGHT)
.expireAfterWrite(CACHE_ENTRY_EXPIRY_MINUTES, TimeUnit.MINUTES)
.maximumWeight(options.getStreamingSideInputCacheMb() * BYTES_PER_MB)
.expireAfterWrite(
options.getStreamingSideInputCacheExpirationMillis(), TimeUnit.MILLISECONDS)
.weigher((Weigher<Key<?>, SideInput<?>>) (id, entry) -> entry.size())
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Callable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
Expand Down Expand Up @@ -61,8 +62,9 @@ public class SideInputStateFetcher {
private final MetricTrackingWindmillServerStub server;
private long bytesRead = 0L;

public SideInputStateFetcher(MetricTrackingWindmillServerStub server) {
this(server, SideInputCache.create());
public SideInputStateFetcher(
MetricTrackingWindmillServerStub server, DataflowPipelineDebugOptions options) {
this(server, SideInputCache.create(options));
}

SideInputStateFetcher(MetricTrackingWindmillServerStub server, SideInputCache sideInputCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
Expand Down Expand Up @@ -79,7 +81,9 @@ public void setUp() {

@Test
public void testFetchGlobalDataBasic() throws Exception {
SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
SideInputStateFetcher fetcher =
new SideInputStateFetcher(
server, PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));

ByteStringOutputStream stream = new ByteStringOutputStream();
ListCoder.of(StringUtf8Coder.of())
Expand Down Expand Up @@ -147,7 +151,9 @@ public void testFetchGlobalDataBasic() throws Exception {

@Test
public void testFetchGlobalDataNull() throws Exception {
SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
SideInputStateFetcher fetcher =
new SideInputStateFetcher(
server, PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));

ByteStringOutputStream stream = new ByteStringOutputStream();
ListCoder.of(VoidCoder.of())
Expand Down Expand Up @@ -302,7 +308,9 @@ public void testFetchGlobalDataCacheOverflow() throws Exception {

@Test
public void testEmptyFetchGlobalData() throws Exception {
SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
SideInputStateFetcher fetcher =
new SideInputStateFetcher(
server, PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));

ByteString encodedIterable = ByteString.EMPTY;

Expand Down

0 comments on commit b666c64

Please sign in to comment.