Skip to content

Commit

Permalink
fix: fix CloudBigtableIO scan to take version and filters (#3901)
Browse files Browse the repository at this point in the history
* WIP: TODO: fix row adaptor and test

* WIP: todo: continue fixing read test, refactor CloudBigtableScanConfiguration

* fixed tests

* clean up debugging logs

* update

* fix tests and update

* fix class name

* remove debug logging`

* add some doc, reformat

* clean up code

* refactor

* add some docs

* fix test

update

* refactor

* refactor 2

* fix some typos

* remove some classes

* remove helper methods
  • Loading branch information
mutianf authored Mar 30, 2023
1 parent f4c7833 commit 307874f
Show file tree
Hide file tree
Showing 14 changed files with 506 additions and 221 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.hbase;

import com.google.api.core.InternalApi;
import com.google.bigtable.v2.ReadRowsRequest;
import org.apache.hadoop.hbase.client.Scan;

/** A wrapper class that wraps a Bigtable {@link ReadRowsRequest}. */
@InternalApi
public class BigtableFixedProtoScan extends Scan {

private ReadRowsRequest request;

public BigtableFixedProtoScan(ReadRowsRequest request) {
this.request = request;
}

public ReadRowsRequest getRequest() {
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public RowMutationEntry adaptEntry(Delete delete) {
public Query adapt(Get get) {
ReadHooks readHooks = new DefaultReadHooks();
Query query = Query.create(getTableId());
Adapters.GET_ADAPTER.adapt(get, readHooks, query);
query = Adapters.GET_ADAPTER.adapt(get, readHooks, query);
readHooks.applyPreSendHook(query);
return query;
}
Expand All @@ -166,7 +166,7 @@ public Query adapt(Get get) {
public Query adapt(Scan scan) {
ReadHooks readHooks = new DefaultReadHooks();
Query query = Query.create(getTableId());
Adapters.SCAN_ADAPTER.adapt(scan, readHooks, query);
query = Adapters.SCAN_ADAPTER.adapt(scan, readHooks, query);
readHooks.applyPreSendHook(query);
return query;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public GetAdapter(ScanAdapter scanAdapter) {

/** {@inheritDoc} */
@Override
public void adapt(Get operation, ReadHooks readHooks, Query query) {
public Query adapt(Get operation, ReadHooks readHooks, Query query) {
Scan operationAsScan = new Scan(addKeyOnlyFilter(operation));
scanAdapter.throwIfUnsupportedScan(operationAsScan);

query
return query
.filter(scanAdapter.buildFilter(operationAsScan, readHooks))
.rowKey(ByteString.copyFrom(operation.getRow()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ public interface ReadOperationAdapter<T extends Operation> {
* @param readHooks a {@link ReadHooks} object.
* @param query a {@link Query} object.
*/
void adapt(T request, ReadHooks readHooks, Query query);
Query adapt(T request, ReadHooks readHooks, Query query);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.bigtable.data.v2.models.Filters.TimestampRangeFilter;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.hbase.BigtableExtendedScan;
import com.google.cloud.bigtable.hbase.BigtableFixedProtoScan;
import com.google.cloud.bigtable.hbase.adapters.filters.FilterAdapter;
import com.google.cloud.bigtable.hbase.adapters.filters.FilterAdapterContext;
import com.google.cloud.bigtable.hbase.util.RowKeyWrapper;
Expand All @@ -51,7 +52,6 @@
*/
@InternalApi("For internal usage only")
public class ScanAdapter implements ReadOperationAdapter<Scan> {

private static final int UNSET_MAX_RESULTS_PER_COLUMN_FAMILY = -1;
private static final boolean OPEN_CLOSED_AVAILABLE = isOpenClosedAvailable();
private static final boolean LIMIT_AVAILABLE = isLimitAvailable();
Expand Down Expand Up @@ -156,14 +156,18 @@ private List<Filters.Filter> buildStartFilter(Scan scan) {

/** {@inheritDoc} */
@Override
public void adapt(Scan scan, ReadHooks readHooks, Query query) {
throwIfUnsupportedScan(scan);

toByteStringRange(scan, query);
query.filter(buildFilter(scan, readHooks));
public Query adapt(Scan scan, ReadHooks readHooks, Query query) {
if (scan instanceof BigtableFixedProtoScan) {
return Query.fromProto(((BigtableFixedProtoScan) scan).getRequest());
} else {
throwIfUnsupportedScan(scan);
toByteStringRange(scan, query);
query.filter(buildFilter(scan, readHooks));

if (LIMIT_AVAILABLE && scan.getLimit() > 0) {
query.limit(scan.getLimit());
if (LIMIT_AVAILABLE && scan.getLimit() > 0) {
query.limit(scan.getLimit());
}
return query;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.hbase.BigtableExtendedScan;
import com.google.cloud.bigtable.hbase.BigtableFixedProtoScan;
import com.google.cloud.bigtable.hbase.adapters.filters.FilterAdapter;
import com.google.cloud.bigtable.hbase.adapters.filters.FilterAdapterContext;
import com.google.cloud.bigtable.hbase.util.ByteStringer;
Expand Down Expand Up @@ -448,4 +449,17 @@ public void testMaxVersionsWithTimeRanges() throws IOException {
.filter(FILTERS.key().regex("blah\\C*"));
Assert.assertEquals(expected.toProto(), query.toProto(requestContext).getFilter());
}

@Test
public void testFixedRequest() {
BigtableFixedProtoScan fixedProto =
new BigtableFixedProtoScan(query.limit(10).toProto(requestContext));

Query placeholder = Query.create("PLACEHOLDER");
Query newQuery = scanAdapter.adapt(fixedProto, throwingReadHooks, placeholder);

Query expected = Query.create("tableId").limit(10);

Assert.assertEquals(expected.toProto(requestContext), newQuery.toProto(requestContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,12 @@
*/
package com.google.cloud.bigtable.beam;

import static com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration.PLACEHOLDER_APP_PROFILE_ID;
import static com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration.PLACEHOLDER_INSTANCE_ID;
import static com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration.PLACEHOLDER_PROJECT_ID;
import static com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration.PLACEHOLDER_TABLE_ID;

import com.google.bigtable.repackaged.com.google.api.core.InternalApi;
import com.google.bigtable.repackaged.com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.beam.sequencefiles.ExportJob.ExportOptions;
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob.ImportOptions;
import com.google.cloud.bigtable.beam.validation.SyncTableJob.SyncTableOptions;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.adapters.Adapters;
import com.google.cloud.bigtable.hbase.adapters.read.DefaultReadHooks;
import com.google.cloud.bigtable.hbase.adapters.read.ReadHooks;
import java.io.Serializable;
import java.nio.charset.CharacterCodingException;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ParseFilter;

/**
* !!! DO NOT TOUCH THIS CLASS !!!
Expand Down Expand Up @@ -92,77 +77,8 @@ public static CloudBigtableTableConfiguration buildSyncTableConfig(SyncTableOpti
return builder.build();
}

/** Provides a request that is constructed with some attributes. */
private static class RequestValueProvider
implements ValueProvider<ReadRowsRequest>, Serializable {
private final ValueProvider<String> start;
private final ValueProvider<String> stop;
private final ValueProvider<Integer> maxVersion;
private final ValueProvider<String> filter;
private ReadRowsRequest cachedRequest;

RequestValueProvider(ExportOptions options) {
this.start = options.getBigtableStartRow();
this.stop = options.getBigtableStopRow();
this.maxVersion = options.getBigtableMaxVersions();
this.filter = options.getBigtableFilter();
}

@Override
public ReadRowsRequest get() {
if (cachedRequest == null) {
Scan scan = new Scan();
if (start.get() != null && !start.get().isEmpty()) {
scan.setStartRow(start.get().getBytes());
}
if (stop.get() != null && !stop.get().isEmpty()) {
scan.setStopRow(stop.get().getBytes());
}
if (maxVersion.get() != null) {
scan.setMaxVersions(maxVersion.get());
}
if (filter.get() != null && !filter.get().isEmpty()) {
try {
scan.setFilter(new ParseFilter().parseFilterString(filter.get()));
} catch (CharacterCodingException e) {
throw new RuntimeException(e);
}
}

ReadHooks readHooks = new DefaultReadHooks();
Query query = Query.create(PLACEHOLDER_TABLE_ID);
Adapters.SCAN_ADAPTER.adapt(scan, readHooks, query);
readHooks.applyPreSendHook(query);
RequestContext requestContext =
RequestContext.create(
PLACEHOLDER_PROJECT_ID, PLACEHOLDER_INSTANCE_ID, PLACEHOLDER_APP_PROFILE_ID);

cachedRequest =
query.toProto(requestContext).toBuilder().setTableName("").setAppProfileId("").build();
}
return cachedRequest;
}

@Override
public boolean isAccessible() {
return start.isAccessible()
&& stop.isAccessible()
&& maxVersion.isAccessible()
&& filter.isAccessible();
}

@Override
public String toString() {
if (isAccessible()) {
return String.valueOf(get());
}
return CloudBigtableConfiguration.VALUE_UNAVAILABLE;
}
}

/** Builds CloudBigtableScanConfiguration from input runtime parameters for export job. */
public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions options) {
ValueProvider<ReadRowsRequest> request = new RequestValueProvider(options);
CloudBigtableScanConfiguration.Builder configBuilder =
new CloudBigtableScanConfiguration.Builder()
.withProjectId(options.getBigtableProject())
Expand All @@ -171,7 +87,12 @@ public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions opt
.withAppProfileId(options.getBigtableAppProfileId())
.withConfiguration(
BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, "SequenceFileExportJob")
.withRequest(request);
.withScan(
new ScanValueProvider(
options.getBigtableStartRow(),
options.getBigtableStopRow(),
options.getBigtableMaxVersions(),
options.getBigtableFilter()));

return configBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,30 @@ public String getInstanceId() {
return configuration.get(BigtableOptionsFactory.INSTANCE_ID_KEY).get();
}

/**
* Gets the value provider for project id.
*
* @return The value provider for project id.
*/
ValueProvider<String> getProjectIdValueProvider() {
return configuration.get(BigtableOptionsFactory.PROJECT_ID_KEY);
}

/**
* Gets the value provider for instance id.
*
* @return The value provider for instance id.
*/
ValueProvider<String> getInstanceIdValueProvider() {
return configuration.get(BigtableOptionsFactory.INSTANCE_ID_KEY);
}

/** Get the Cloud Bigtable App Profile id. */
public String getAppProfileId() {
if (configuration.get(BigtableOptionsFactory.APP_PROFILE_ID_KEY) == null
|| configuration.get(BigtableOptionsFactory.APP_PROFILE_ID_KEY).get() == null) {
return "default";
}
return configuration.get(BigtableOptionsFactory.APP_PROFILE_ID_KEY).get();
}

Expand Down
Loading

0 comments on commit 307874f

Please sign in to comment.