Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: two-way replication for Hbase-Bigtable replication #3920

Merged
merged 34 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cbf037f
Marked out places for loop prevention code change
georgecma Jan 11, 2023
6885504
Two-way replicator special mutation tagging works in unit tests
georgecma Jan 11, 2023
69f2ed7
Added metrics for de-looped vs replicated cells
georgecma Jan 12, 2023
e9dbf89
Added test to verify metrics for two-way replication cell suppression…
georgecma Jan 12, 2023
603c8f1
Added more tests and test comments for two-way replication behavior
georgecma Jan 12, 2023
9350829
Added tests to test custom special column qualifier functionality, re…
georgecma Jan 12, 2023
4726e6f
removed obsolete comments from adapter
georgecma Jan 12, 2023
aed97f4
Cleaned up comments in test to be more readable
georgecma Jan 12, 2023
d736c4f
removed unnused imports
georgecma Jan 14, 2023
cddc459
quick edit
georgecma Jan 14, 2023
7b94a9c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 19, 2023
7342b3b
Moved two-way-replication functions to be more readable
georgecma Jan 24, 2023
39a8ebf
Two way replication functions created to make replicateTable loop mor…
georgecma Jan 24, 2023
1f3af6a
minor format changes
georgecma Jan 24, 2023
06e9240
Cleaned up replicator
georgecma Jan 24, 2023
1ee0df6
Resolved comments on hbase-migration-tools-core
georgecma Feb 7, 2023
e250fdd
renamed configuration string to bidirectional replication
georgecma Feb 7, 2023
f6ec060
renamed all two-way to bidirectional
georgecma Feb 7, 2023
e0c0984
Added integration tests
georgecma Feb 7, 2023
e50193c
Removed unused import from ReplicatioNEndpoint
georgecma Feb 7, 2023
8b5372e
Changed functions according to comments
georgecma Feb 7, 2023
cf9c9f9
extrapolated ROW_KEY_2 in tests to testUtils
georgecma Feb 7, 2023
34e3555
auto enable bidirectional replication settings
georgecma Feb 20, 2023
0710077
Renamed source qualifiers to match bigtable side qualifiers
georgecma Feb 22, 2023
7a970fb
Added finer-granularity metrics to replicator
georgecma Feb 24, 2023
f642522
Added complex test for bidirectional replication
georgecma Mar 23, 2023
bdf34fd
Simplified implementation and linted
georgecma Jun 9, 2023
978497d
Updated readme with bidirectional replication
georgecma Jun 12, 2023
a14445a
Added more technical explanation for bidirectional replication
georgecma Jun 12, 2023
0d7ec83
updated readme - bigtable-hbase template link updated from pr to code…
georgecma Jun 12, 2023
105f0db
Added custom user agent tag for bidirectional replication
georgecma Jun 12, 2023
caa0804
Linted files
georgecma Jun 12, 2023
e2bcceb
Added short blurb on a potential data divergence scenario
georgecma Jun 13, 2023
0b38a9c
Add metrics and fix typo in readme, change replicator info logs to us…
georgecma Jun 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions hbase-migration-tools/bigtable-hbase-replication/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,34 @@ Please refer
to [HBaseToCloudBigtableReplicationConfiguration](bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/configuration/HBaseToCloudBigtableReplicationConfiguration.java)
for other properties that can be set.

### Bidirectional Replication with Cloud Bigtable

To enable bidirectional replication support with Cloud Bigtable, add the following property to `hbase-site.xml`.
```
<property>
<name>google.bigtable.replication.enable_bidirectional_replication</name>
<value>true</value>
</property>
```

This feature filters out mutations replicated to Hbase from Cloud Bigtable change streams with the [Bigtable-Hbase replicator](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/bigtable-cdc-to-hbase) to prevent replication loops.

More technically, the feature's logic checks if the last mutation of every outgoing RowMutation contains a Delete mutation on a column qualifier that matches the `cbt_qualifier` property value. If it does, that RowMutation is filtered out and not replicated.

Every replicated mutation it sends out is tagged with a Delete mutation on a qualifier set with the `hbase_qualifier` property value. The Bigtable-Hbase replicator has the same logic to filter out `hbase_qualifier` Delete mutations.

The default qualifier values are already set in [HBaseToCloudBigtableReplicationConfiguration](bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/configuration/HBaseToCloudBigtableReplicationConfiguration.java), but can also be manually set in `hbase-site.xml`.
```
<property>
<name>google.bigtable.replication.hbase_qualifier</name>
<value>REPLICATED_FROM_HBASE</value>
</property>
<property>
<name>google.bigtable.replication.cbt_qualifier</name>
<value>REPLICATED_FROM_CLOUD_BIGTABLE</value>
</property>
```

## Deployment

Use the replication library version corresponding to your HBase version. For
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.hbase1_x.replication;

import static com.google.cloud.bigtable.hbase.replication.utils.TestUtils.CF2;

import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.replication.utils.TestUtils;
import com.google.cloud.bigtable.test.helper.BigtableEmulatorRule;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test bidirectional replication.
* This test is separate from the other endpoint tests because it requires spinning up a
* cluster with additional config settings.
*/
@RunWith(JUnit4.class)
public class HbaseToCloudBigtableBidirectionalReplicationEndpointTest {

public static class TestReplicationEndpoint extends HbaseToCloudBigtableReplicationEndpoint {

static AtomicInteger replicatedEntries = new AtomicInteger();

@Override
public boolean replicate(ReplicateContext replicateContext) {
boolean result = super.replicate(replicateContext);
replicatedEntries.getAndAdd(replicateContext.getEntries().size());
return result;
}
}

private static final Logger LOG =
LoggerFactory.getLogger(HbaseToCloudBigtableBidirectionalReplicationEndpointTest.class);

private static HBaseTestingUtility hbaseTestingUtil;
private static ReplicationAdmin replicationAdmin;

@ClassRule
public static final BigtableEmulatorRule bigtableEmulator = new BigtableEmulatorRule();

private static Connection cbtConnection;
private static Connection hbaseConnection;

private Table hbaseTable;
private Table cbtTable;

private static byte[] cbtQualifier = "customCbtQualifier".getBytes();
private static byte[] hbaseQualifier = "customHbaseQualifier".getBytes();

@BeforeClass
public static void setUpCluster() throws Exception {
// Prepare HBase mini cluster configuration
Configuration conf = new HBaseTestingUtility().getConfiguration();
// Set CBT related configs.
conf.set("google.bigtable.instance.id", "test-instance");
conf.set("google.bigtable.project.id", "test-project");
// This config will connect Replication endpoint to the emulator and not the prod CBT.
conf.set("google.bigtable.emulator.endpoint.host", "localhost:" + bigtableEmulator.getPort());
// Set bidirectional replication related settings
conf.set("google.bigtable.replication.enable_bidirectional_replication", "true");
conf.set("google.bigtable.replication.hbase_qualifier", new String(hbaseQualifier));
conf.set("google.bigtable.replication.cbt_qualifier", new String(cbtQualifier));

hbaseTestingUtil = new HBaseTestingUtility(conf);
hbaseTestingUtil.startMiniCluster(2);
replicationAdmin = new ReplicationAdmin(hbaseTestingUtil.getConfiguration());

cbtConnection = BigtableConfiguration.connect(conf);
hbaseConnection = hbaseTestingUtil.getConnection();

// Setup Replication in HBase mini cluster
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
peerConfig.setReplicationEndpointImpl(
TestReplicationEndpoint.class.getTypeName());
// Cluster key is required, we don't really have a clusterKey for CBT.
peerConfig.setClusterKey(hbaseTestingUtil.getClusterKey());
replicationAdmin.addPeer("cbt", peerConfig);
}

@AfterClass
public static void tearDown() throws Exception {
cbtConnection.close();
hbaseConnection.close();
replicationAdmin.close();
hbaseTestingUtil.shutdownMiniCluster();
}

@After
public void tearDownTable() throws IOException {
cbtTable.close();
hbaseTable.close();
}

@Before
public void setupTestCase() throws IOException {

// Create and set the empty tables
TableName table1 = TableName.valueOf(UUID.randomUUID().toString());
createTables(table1, HConstants.REPLICATION_SCOPE_GLOBAL, HConstants.REPLICATION_SCOPE_GLOBAL);

cbtTable = cbtConnection.getTable(table1);
hbaseTable = hbaseConnection.getTable(table1);

// Reset the entry counts for TestReplicationEndpoint
TestReplicationEndpoint.replicatedEntries.set(0);
}

private void createTables(TableName tableName, int cf1Scope, int cf2Scope) throws IOException {
// Create table in HBase
HTableDescriptor htd = hbaseTestingUtil.createTableDescriptor(tableName.getNameAsString());
HColumnDescriptor cf1 = new HColumnDescriptor(TestUtils.CF1);
cf1.setMaxVersions(100);
htd.addFamily(cf1);
HColumnDescriptor cf2 = new HColumnDescriptor(CF2);
cf2.setMaxVersions(100);
htd.addFamily(cf2);

// Enables replication to all peers, including CBT
cf1.setScope(cf1Scope);
cf2.setScope(cf2Scope);
hbaseTestingUtil.getHBaseAdmin().createTable(htd);
cbtConnection.getAdmin().createTable(htd);
}

/**
* Bidirectional replication should replicate source entry and drop cbt-replicated entry to
* prevent loops from forming.
*/
@Test
public void testDropsReplicatedEntry() throws IOException, InterruptedException {
RowMutations mutationToDrop = new RowMutations(TestUtils.ROW_KEY);
mutationToDrop.add(new Put(TestUtils.ROW_KEY).addColumn(TestUtils.CF1, TestUtils.COL_QUALIFIER, 0, TestUtils.VALUE));
mutationToDrop.add(
// Special delete mutation signifying this came from Bigtable replicator
new Delete(TestUtils.ROW_KEY).addColumns(TestUtils.CF1, cbtQualifier, 0)
);
RowMutations mutationToReplicate = new RowMutations(TestUtils.ROW_KEY_2);
mutationToReplicate.add(
new Put(TestUtils.ROW_KEY_2).addColumn(TestUtils.CF1, TestUtils.COL_QUALIFIER, 0, TestUtils.VALUE)
);

hbaseTable.mutateRow(mutationToDrop);
hbaseTable.mutateRow(mutationToReplicate);

// Wait for replication
TestUtils.waitForReplication(
() -> {
// Only one entry should've been replicated
return TestReplicationEndpoint.replicatedEntries.get() >= 1;
});

// Hbase table should have both mutations
Assert.assertTrue(hbaseTable.get(new Get(TestUtils.ROW_KEY)).size() == 1);
Assert.assertTrue(hbaseTable.get(new Get(TestUtils.ROW_KEY_2)).size() == 1);
// Cbt table should have only one mutation
Assert.assertTrue(cbtTable.get(new Get(TestUtils.ROW_KEY)).isEmpty());
Assert.assertTrue(cbtTable.get(new Get(TestUtils.ROW_KEY_2)).size() == 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,5 +181,4 @@ protected void doStop() {
@Override
public void peerConfigUpdated(ReplicationPeerConfig replicationPeerConfig) {}
// TODO(we can implement this to enable/disable dry-run mode without deleting the peer)

}
Loading