Skip to content

Commit 75e4136

Browse files
iadamcsik-cldrmeszibalu
authored andcommitted
HBASE-23295 HBaseContext should use most recent delegation token (apache#47)
Signed-off-by: Balazs Meszaros <meszibalu@apache.org>
1 parent 3106694 commit 75e4136

File tree

3 files changed

+67
-91
lines changed

3 files changed

+67
-91
lines changed

spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala

+1-12
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,11 @@ class HBaseContext(@transient val sc: SparkContext,
6565
val tmpHdfsConfgFile: String = null)
6666
extends Serializable with Logging {
6767

68-
@transient var credentials = UserGroupInformation.getCurrentUser().getCredentials()
6968
@transient var tmpHdfsConfiguration:Configuration = config
7069
@transient var appliedCredentials = false
7170
@transient val job = Job.getInstance(config)
7271
TableMapReduceUtil.initCredentials(job)
7372
val broadcastedConf = sc.broadcast(new SerializableWritable(config))
74-
val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials))
7573

7674
LatestHBaseContextCache.latest = this
7775

@@ -233,21 +231,12 @@ class HBaseContext(@transient val sc: SparkContext,
233231
}
234232

235233
def applyCreds[T] (){
236-
credentials = UserGroupInformation.getCurrentUser().getCredentials()
237-
238-
if (log.isDebugEnabled) {
239-
logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials)
240-
}
241-
242-
if (!appliedCredentials && credentials != null) {
234+
if (!appliedCredentials) {
243235
appliedCredentials = true
244236

245237
@transient val ugi = UserGroupInformation.getCurrentUser
246-
ugi.addCredentials(credentials)
247238
// specify that this is a proxy user
248239
ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
249-
250-
ugi.addCredentials(credentialsConf.value.value)
251240
}
252241
}
253242

spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java

+62-72
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@
5252
import org.apache.spark.api.java.JavaSparkContext;
5353
import org.apache.spark.api.java.function.Function;
5454
import org.junit.After;
55+
import org.junit.AfterClass;
5556
import org.junit.Assert;
5657
import org.junit.Before;
58+
import org.junit.BeforeClass;
5759
import org.junit.ClassRule;
5860
import org.junit.Test;
5961
import org.junit.experimental.categories.Category;
@@ -70,68 +72,68 @@ public class TestJavaHBaseContext implements Serializable {
7072
public static final HBaseClassTestRule TIMEOUT =
7173
HBaseClassTestRule.forClass(TestJavaHBaseContext.class);
7274

73-
private transient JavaSparkContext jsc;
74-
HBaseTestingUtility htu;
75-
protected static final Logger LOG = LoggerFactory.getLogger(TestJavaHBaseContext.class);
76-
77-
75+
private static transient JavaSparkContext JSC;
76+
private static HBaseTestingUtility TEST_UTIL;
77+
private static JavaHBaseContext HBASE_CONTEXT;
78+
private static final Logger LOG = LoggerFactory.getLogger(TestJavaHBaseContext.class);
7879

7980
byte[] tableName = Bytes.toBytes("t1");
8081
byte[] columnFamily = Bytes.toBytes("c");
8182
byte[] columnFamily1 = Bytes.toBytes("d");
8283
String columnFamilyStr = Bytes.toString(columnFamily);
8384
String columnFamilyStr1 = Bytes.toString(columnFamily1);
8485

86+
@BeforeClass
87+
public static void setUpBeforeClass() throws Exception {
8588

86-
@Before
87-
public void setUp() {
88-
jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
89+
JSC = new JavaSparkContext("local", "JavaHBaseContextSuite");
90+
TEST_UTIL = new HBaseTestingUtility();
91+
Configuration conf = TEST_UTIL.getConfiguration();
8992

90-
File tempDir = Files.createTempDir();
91-
tempDir.deleteOnExit();
93+
HBASE_CONTEXT = new JavaHBaseContext(JSC, conf);
9294

93-
htu = new HBaseTestingUtility();
94-
try {
95-
LOG.info("cleaning up test dir");
95+
LOG.info("cleaning up test dir");
9696

97-
htu.cleanupTestDir();
97+
TEST_UTIL.cleanupTestDir();
9898

99-
LOG.info("starting minicluster");
99+
LOG.info("starting minicluster");
100100

101-
htu.startMiniZKCluster();
102-
htu.startMiniHBaseCluster(1, 1);
101+
TEST_UTIL.startMiniZKCluster();
102+
TEST_UTIL.startMiniHBaseCluster(1, 1);
103103

104-
LOG.info(" - minicluster started");
104+
LOG.info(" - minicluster started");
105+
}
105106

106-
try {
107-
htu.deleteTable(TableName.valueOf(tableName));
108-
} catch (Exception e) {
109-
LOG.info(" - no table " + Bytes.toString(tableName) + " found");
110-
}
107+
@AfterClass
108+
public static void tearDownAfterClass() throws Exception {
109+
LOG.info("shuting down minicluster");
110+
TEST_UTIL.shutdownMiniHBaseCluster();
111+
TEST_UTIL.shutdownMiniZKCluster();
112+
LOG.info(" - minicluster shut down");
113+
TEST_UTIL.cleanupTestDir();
111114

112-
LOG.info(" - creating table " + Bytes.toString(tableName));
113-
htu.createTable(TableName.valueOf(tableName),
114-
new byte[][]{columnFamily, columnFamily1});
115-
LOG.info(" - created table");
116-
} catch (Exception e1) {
117-
throw new RuntimeException(e1);
118-
}
115+
JSC.stop();
116+
JSC = null;
119117
}
120118

121-
@After
122-
public void tearDown() {
119+
@Before
120+
public void setUp() throws Exception {
121+
123122
try {
124-
htu.deleteTable(TableName.valueOf(tableName));
125-
LOG.info("shuting down minicluster");
126-
htu.shutdownMiniHBaseCluster();
127-
htu.shutdownMiniZKCluster();
128-
LOG.info(" - minicluster shut down");
129-
htu.cleanupTestDir();
123+
TEST_UTIL.deleteTable(TableName.valueOf(tableName));
130124
} catch (Exception e) {
131-
throw new RuntimeException(e);
125+
LOG.info(" - no table {} found", Bytes.toString(tableName));
132126
}
133-
jsc.stop();
134-
jsc = null;
127+
128+
LOG.info(" - creating table {}", Bytes.toString(tableName));
129+
TEST_UTIL.createTable(TableName.valueOf(tableName),
130+
new byte[][]{columnFamily, columnFamily1});
131+
LOG.info(" - created table");
132+
}
133+
134+
@After
135+
public void tearDown() throws Exception {
136+
TEST_UTIL.deleteTable(TableName.valueOf(tableName));
135137
}
136138

137139
@Test
@@ -144,11 +146,9 @@ public void testBulkPut() throws IOException {
144146
list.add("4," + columnFamilyStr + ",a,4");
145147
list.add("5," + columnFamilyStr + ",a,5");
146148

147-
JavaRDD<String> rdd = jsc.parallelize(list);
148-
149-
Configuration conf = htu.getConfiguration();
149+
JavaRDD<String> rdd = JSC.parallelize(list);
150150

151-
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
151+
Configuration conf = TEST_UTIL.getConfiguration();
152152

153153
Connection conn = ConnectionFactory.createConnection(conf);
154154
Table table = conn.getTable(TableName.valueOf(tableName));
@@ -163,7 +163,7 @@ public void testBulkPut() throws IOException {
163163
table.close();
164164
}
165165

166-
hbaseContext.bulkPut(rdd,
166+
HBASE_CONTEXT.bulkPut(rdd,
167167
TableName.valueOf(tableName),
168168
new PutFunction());
169169

@@ -212,15 +212,13 @@ public void testBulkDelete() throws IOException {
212212
list.add(Bytes.toBytes("2"));
213213
list.add(Bytes.toBytes("3"));
214214

215-
JavaRDD<byte[]> rdd = jsc.parallelize(list);
215+
JavaRDD<byte[]> rdd = JSC.parallelize(list);
216216

217-
Configuration conf = htu.getConfiguration();
217+
Configuration conf = TEST_UTIL.getConfiguration();
218218

219219
populateTableWithMockData(conf, TableName.valueOf(tableName));
220220

221-
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
222-
223-
hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
221+
HBASE_CONTEXT.bulkDelete(rdd, TableName.valueOf(tableName),
224222
new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
225223

226224

@@ -248,17 +246,15 @@ public void testBulkDelete() throws IOException {
248246

249247
@Test
250248
public void testDistributedScan() throws IOException {
251-
Configuration conf = htu.getConfiguration();
249+
Configuration conf = TEST_UTIL.getConfiguration();
252250

253251
populateTableWithMockData(conf, TableName.valueOf(tableName));
254252

255-
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
256-
257253
Scan scan = new Scan();
258254
scan.setCaching(100);
259255

260256
JavaRDD<String> javaRdd =
261-
hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
257+
HBASE_CONTEXT.hbaseRDD(TableName.valueOf(tableName), scan)
262258
.map(new ScanConvertFunction());
263259

264260
List<String> results = javaRdd.collect();
@@ -283,16 +279,14 @@ public void testBulkGet() throws IOException {
283279
list.add(Bytes.toBytes("4"));
284280
list.add(Bytes.toBytes("5"));
285281

286-
JavaRDD<byte[]> rdd = jsc.parallelize(list);
282+
JavaRDD<byte[]> rdd = JSC.parallelize(list);
287283

288-
Configuration conf = htu.getConfiguration();
284+
Configuration conf = TEST_UTIL.getConfiguration();
289285

290286
populateTableWithMockData(conf, TableName.valueOf(tableName));
291287

292-
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
293-
294288
final JavaRDD<String> stringJavaRDD =
295-
hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
289+
HBASE_CONTEXT.bulkGet(TableName.valueOf(tableName), 2, rdd,
296290
new GetFunction(),
297291
new ResultFunction());
298292

@@ -302,7 +296,7 @@ public void testBulkGet() throws IOException {
302296
@Test
303297
public void testBulkLoad() throws Exception {
304298

305-
Path output = htu.getDataTestDir("testBulkLoad");
299+
Path output = TEST_UTIL.getDataTestDir("testBulkLoad");
306300
// Add cell as String: "row,falmily,qualifier,value"
307301
List<String> list= new ArrayList<String>();
308302
// row1
@@ -315,14 +309,11 @@ public void testBulkLoad() throws Exception {
315309
list.add("2," + columnFamilyStr + ",a,3");
316310
list.add("2," + columnFamilyStr + ",b,3");
317311

318-
JavaRDD<String> rdd = jsc.parallelize(list);
319-
320-
Configuration conf = htu.getConfiguration();
321-
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
322-
312+
JavaRDD<String> rdd = JSC.parallelize(list);
323313

314+
Configuration conf = TEST_UTIL.getConfiguration();
324315

325-
hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName), new BulkLoadFunction(),
316+
HBASE_CONTEXT.bulkLoad(rdd, TableName.valueOf(tableName), new BulkLoadFunction(),
326317
output.toUri().getPath(), new HashMap<byte[], FamilyHFileWriteOptions>(), false,
327318
HConstants.DEFAULT_MAX_FILE_SIZE);
328319

@@ -369,7 +360,7 @@ public void testBulkLoad() throws Exception {
369360

370361
@Test
371362
public void testBulkLoadThinRows() throws Exception {
372-
Path output = htu.getDataTestDir("testBulkLoadThinRows");
363+
Path output = TEST_UTIL.getDataTestDir("testBulkLoadThinRows");
373364
// because of the limitation of scala bulkLoadThinRows API
374365
// we need to provide data as <row, all cells in that row>
375366
List<List<String>> list= new ArrayList<List<String>>();
@@ -389,12 +380,11 @@ public void testBulkLoadThinRows() throws Exception {
389380
list2.add("2," + columnFamilyStr + ",b,3");
390381
list.add(list2);
391382

392-
JavaRDD<List<String>> rdd = jsc.parallelize(list);
383+
JavaRDD<List<String>> rdd = JSC.parallelize(list);
393384

394-
Configuration conf = htu.getConfiguration();
395-
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
385+
Configuration conf = TEST_UTIL.getConfiguration();
396386

397-
hbaseContext.bulkLoadThinRows(rdd, TableName.valueOf(tableName), new BulkLoadThinRowsFunction(),
387+
HBASE_CONTEXT.bulkLoadThinRows(rdd, TableName.valueOf(tableName), new BulkLoadThinRowsFunction(),
398388
output.toString(), new HashMap<byte[], FamilyHFileWriteOptions>(), false,
399389
HConstants.DEFAULT_MAX_FILE_SIZE);
400390

spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala

+4-7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class HBaseContextSuite extends FunSuite with
2727
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
2828

2929
@transient var sc: SparkContext = null
30+
var hbaseContext: HBaseContext = null
3031
var TEST_UTIL = new HBaseTestingUtility
3132

3233
val tableName = "t1"
@@ -49,6 +50,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
4950
val envMap = Map[String,String](("Xmx", "512m"))
5051

5152
sc = new SparkContext("local", "test", null, Nil, envMap)
53+
54+
val config = TEST_UTIL.getConfiguration
55+
hbaseContext = new HBaseContext(sc, config)
5256
}
5357

5458
override def afterAll() {
@@ -73,7 +77,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
7377
(Bytes.toBytes("5"),
7478
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
7579

76-
val hbaseContext = new HBaseContext(sc, config)
7780
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
7881
TableName.valueOf(tableName),
7982
(putRecord) => {
@@ -132,7 +135,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
132135
Bytes.toBytes("delete1"),
133136
Bytes.toBytes("delete3")))
134137

135-
val hbaseContext = new HBaseContext(sc, config)
136138
hbaseContext.bulkDelete[Array[Byte]](rdd,
137139
TableName.valueOf(tableName),
138140
putRecord => new Delete(putRecord),
@@ -174,7 +176,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
174176
Bytes.toBytes("get2"),
175177
Bytes.toBytes("get3"),
176178
Bytes.toBytes("get4")))
177-
val hbaseContext = new HBaseContext(sc, config)
178179

179180
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
180181
TableName.valueOf(tableName),
@@ -221,7 +222,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
221222
Bytes.toBytes("get2"),
222223
Bytes.toBytes("get3"),
223224
Bytes.toBytes("get4")))
224-
val hbaseContext = new HBaseContext(sc, config)
225225

226226
intercept[SparkException] {
227227
try {
@@ -274,7 +274,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
274274
Bytes.toBytes("get2"),
275275
Bytes.toBytes("get3"),
276276
Bytes.toBytes("get4")))
277-
val hbaseContext = new HBaseContext(sc, config)
278277

279278
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
280279
TableName.valueOf(tableName),
@@ -329,8 +328,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
329328
connection.close()
330329
}
331330

332-
val hbaseContext = new HBaseContext(sc, config)
333-
334331
val scan = new Scan()
335332
val filter = new FirstKeyOnlyFilter()
336333
scan.setCaching(100)

0 commit comments

Comments
 (0)