Skip to content

Commit c99dfd1

Browse files
authoredNov 9, 2022
Use resultFormat field to properly process query results (#92)
Signed-off-by: Levko Kravets <levko.ne@gmail.com> Signed-off-by: Levko Kravets <levko.ne@gmail.com>
·
1.11.01.1.0
1 parent e57e017 commit c99dfd1

File tree

9 files changed

+106
-75
lines changed

9 files changed

+106
-75
lines changed
 

‎lib/DBSQLClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
5656

5757
private thrift = thrift;
5858

59-
constructor(options: ClientOptions) {
59+
constructor(options?: ClientOptions) {
6060
super();
6161
this.connectionProvider = new HttpConnection();
6262
this.authProvider = new NoSaslAuthentication();

‎lib/DBSQLOperation/SchemaHelper.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
import { TOperationHandle, TGetResultSetMetadataResp } from '../../thrift/TCLIService_types';
1+
import { TOperationHandle, TGetResultSetMetadataResp, TSparkRowSetType } from '../../thrift/TCLIService_types';
22
import HiveDriver from '../hive/HiveDriver';
33
import StatusFactory from '../factory/StatusFactory';
4+
import IOperationResult from '../result/IOperationResult';
5+
import JsonResult from '../result/JsonResult';
6+
import HiveDriverError from '../errors/HiveDriverError';
47
import { definedOrError } from '../utils';
58

69
export default class SchemaHelper {
@@ -10,15 +13,15 @@ export default class SchemaHelper {
1013

1114
private statusFactory = new StatusFactory();
1215

13-
private metadata: TGetResultSetMetadataResp | null = null;
16+
private metadata?: TGetResultSetMetadataResp;
1417

1518
constructor(driver: HiveDriver, operationHandle: TOperationHandle, metadata?: TGetResultSetMetadataResp) {
1619
this.driver = driver;
1720
this.operationHandle = operationHandle;
18-
this.metadata = metadata || null;
21+
this.metadata = metadata;
1922
}
2023

21-
async fetch() {
24+
private async fetchMetadata() {
2225
if (!this.metadata) {
2326
const metadata = await this.driver.getResultSetMetadata({
2427
operationHandle: this.operationHandle,
@@ -27,6 +30,24 @@ export default class SchemaHelper {
2730
this.metadata = metadata;
2831
}
2932

30-
return definedOrError(this.metadata.schema);
33+
return this.metadata;
34+
}
35+
36+
async fetch() {
37+
const metadata = await this.fetchMetadata();
38+
return definedOrError(metadata.schema);
39+
}
40+
41+
async getResultHandler(): Promise<IOperationResult> {
42+
const metadata = await this.fetchMetadata();
43+
const schema = definedOrError(metadata.schema);
44+
const resultFormat = definedOrError(metadata.resultFormat);
45+
46+
switch (resultFormat) {
47+
case TSparkRowSetType.COLUMN_BASED_SET:
48+
return new JsonResult(schema);
49+
default:
50+
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
51+
}
3152
}
3253
}

‎lib/DBSQLOperation/getResult.ts

Lines changed: 0 additions & 16 deletions
This file was deleted.

‎lib/DBSQLOperation/index.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import {
99
} from '../../thrift/TCLIService_types';
1010
import Status from '../dto/Status';
1111

12-
import getResult from './getResult';
1312
import OperationStatusHelper from './OperationStatusHelper';
1413
import SchemaHelper from './SchemaHelper';
1514
import FetchResultsHelper from './FetchResultsHelper';
@@ -94,9 +93,9 @@ export default class DBSQLOperation implements IOperation {
9493

9594
await this._status.waitUntilReady(options);
9695

97-
return Promise.all([this._schema.fetch(), this._data.fetch(options?.maxRows || defaultMaxRows)]).then(
98-
([schema, data]) => {
99-
const result = getResult(schema, data ? [data] : []);
96+
return Promise.all([this._schema.getResultHandler(), this._data.fetch(options?.maxRows || defaultMaxRows)]).then(
97+
([resultHandler, data]) => {
98+
const result = resultHandler.getValue(data ? [data] : []);
10099
this.logger?.log(
101100
LogLevel.debug,
102101
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.getId()}`,

‎lib/result/IOperationResult.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { TRowSet } from '../../thrift/TCLIService_types';
2+
13
export default interface IOperationResult {
2-
getValue(): any;
4+
getValue(data?: Array<TRowSet>): any;
35
}

‎lib/result/JsonResult.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,20 @@ import {
1010
import IOperationResult from './IOperationResult';
1111

1212
export default class JsonResult implements IOperationResult {
13-
private readonly schema: TTableSchema | null;
13+
private readonly schema?: TTableSchema;
1414

15-
private readonly data: Array<TRowSet> | null;
16-
17-
constructor(schema: TTableSchema | null, data: Array<TRowSet>) {
15+
constructor(schema?: TTableSchema) {
1816
this.schema = schema;
19-
this.data = data;
2017
}
2118

22-
getValue(): Array<object> {
23-
if (!this.data) {
19+
getValue(data?: Array<TRowSet>): Array<object> {
20+
if (!data) {
2421
return [];
2522
}
2623

2724
const descriptors = this.getSchemaColumns();
2825

29-
return this.data.reduce((result: Array<any>, rowSet: TRowSet) => {
26+
return data.reduce((result: Array<any>, rowSet: TRowSet) => {
3027
const columns = rowSet.columns || [];
3128
const rows = this.getRows(columns, descriptors);
3229

‎lib/result/NullResult.ts

Lines changed: 0 additions & 7 deletions
This file was deleted.

‎tests/unit/DBSQLOperation.test.js

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
const { expect, AssertionError } = require('chai');
22
const sinon = require('sinon');
33
const { DBSQLLogger, LogLevel } = require('../../dist');
4-
const { TStatusCode, TOperationState, TTypeId } = require('../../thrift/TCLIService_types');
4+
const { TStatusCode, TOperationState, TTypeId, TSparkRowSetType } = require('../../thrift/TCLIService_types');
55
const DBSQLOperation = require('../../dist/DBSQLOperation').default;
66
const StatusError = require('../../dist/errors/StatusError').default;
77
const OperationStateError = require('../../dist/errors/OperationStateError').default;
8-
const getResult = require('../../dist/DBSQLOperation/getResult').default;
8+
const HiveDriverError = require('../../dist/errors/HiveDriverError').default;
99

1010
// Create logger that won't emit
1111
//
@@ -27,6 +27,7 @@ class DriverMock {
2727

2828
getResultSetMetadataResp = {
2929
status: { statusCode: TStatusCode.SUCCESS_STATUS },
30+
resultFormat: TSparkRowSetType.COLUMN_BASED_SET,
3031
schema: {
3132
columns: [
3233
{
@@ -580,14 +581,14 @@ describe('DBSQLOperation', () => {
580581
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
581582
});
582583

583-
driver.getResultSetMetadataResp.schema = null;
584+
driver.getResultSetMetadataResp.schema = { columns: [] };
584585

585586
const operation = new DBSQLOperation(driver, handle, logger);
586587

587588
const schema = await operation.getSchema();
588589

589590
expect(driver.getOperationStatus.called).to.be.true;
590-
expect(schema).to.be.null;
591+
expect(schema).to.deep.equal(driver.getResultSetMetadataResp.schema);
591592
expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE);
592593
});
593594

@@ -604,8 +605,6 @@ describe('DBSQLOperation', () => {
604605
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
605606
});
606607

607-
driver.getResultSetMetadataResp.schema = null;
608-
609608
const operation = new DBSQLOperation(driver, handle, logger);
610609
await operation.getSchema({ progress: true });
611610

@@ -629,8 +628,6 @@ describe('DBSQLOperation', () => {
629628
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
630629
});
631630

632-
driver.getResultSetMetadataResp.schema = null;
633-
634631
const operation = new DBSQLOperation(driver, handle, logger);
635632

636633
const callback = sinon.stub();
@@ -753,7 +750,7 @@ describe('DBSQLOperation', () => {
753750
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
754751
});
755752

756-
driver.getResultSetMetadataResp.schema = null;
753+
driver.getResultSetMetadataResp.schema = { columns: [] };
757754
driver.fetchResultsResp.hasMoreRows = false;
758755
driver.fetchResultsResp.results.columns = [];
759756

@@ -762,7 +759,7 @@ describe('DBSQLOperation', () => {
762759
const results = await operation.fetchChunk();
763760

764761
expect(driver.getOperationStatus.called).to.be.true;
765-
expect(results).to.be.null;
762+
expect(results).to.deep.equal([]);
766763
expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE);
767764
});
768765

@@ -779,7 +776,7 @@ describe('DBSQLOperation', () => {
779776
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
780777
});
781778

782-
driver.getResultSetMetadataResp.schema = null;
779+
driver.getResultSetMetadataResp.schema = { columns: [] };
783780
driver.fetchResultsResp.hasMoreRows = false;
784781
driver.fetchResultsResp.results.columns = [];
785782

@@ -806,7 +803,7 @@ describe('DBSQLOperation', () => {
806803
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
807804
});
808805

809-
driver.getResultSetMetadataResp.schema = null;
806+
driver.getResultSetMetadataResp.schema = { columns: [] };
810807
driver.fetchResultsResp.hasMoreRows = false;
811808
driver.fetchResultsResp.results.columns = [];
812809

@@ -908,6 +905,29 @@ describe('DBSQLOperation', () => {
908905
expect(driver.getResultSetMetadata.callCount).to.be.eq(1);
909906
expect(driver.fetchResults.callCount).to.be.eq(1);
910907
});
908+
909+
it('should fail on unsupported result format', async () => {
910+
const handle = new OperationHandleMock();
911+
handle.hasResultSet = true;
912+
913+
const driver = new DriverMock();
914+
driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE;
915+
916+
driver.getResultSetMetadataResp.resultFormat = TSparkRowSetType.ROW_BASED_SET;
917+
driver.getResultSetMetadataResp.schema = { columns: [] };
918+
919+
const operation = new DBSQLOperation(driver, handle, logger);
920+
921+
try {
922+
await operation.fetchChunk();
923+
expect.fail('It should throw a HiveDriverError');
924+
} catch (e) {
925+
if (e instanceof AssertionError) {
926+
throw e;
927+
}
928+
expect(e).to.be.instanceOf(HiveDriverError);
929+
}
930+
});
911931
});
912932

913933
describe('fetchAll', () => {
@@ -1052,16 +1072,4 @@ describe('DBSQLOperation', () => {
10521072
expect(await operation.hasMoreRows()).to.be.false;
10531073
});
10541074
});
1055-
1056-
describe('getResult', () => {
1057-
it('should return null result', () => {
1058-
const t = getResult(null, []);
1059-
expect(t).to.equal(null);
1060-
});
1061-
1062-
it('should return json result', () => {
1063-
const t = getResult({ columns: [] }, []);
1064-
expect(t).to.deep.equal([]);
1065-
});
1066-
});
10671075
});

‎tests/unit/result/JsonResult.test.js

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ describe('JsonResult', () => {
101101
},
102102
];
103103

104-
const result = new JsonResult(schema, data);
104+
const result = new JsonResult(schema);
105105

106-
expect(result.getValue()).to.be.deep.eq([
106+
expect(result.getValue(data)).to.be.deep.eq([
107107
{
108108
'table.str': 'a',
109109
'table.int64': 282578800148737,
@@ -171,9 +171,9 @@ describe('JsonResult', () => {
171171
},
172172
];
173173

174-
const result = new JsonResult(schema, data);
174+
const result = new JsonResult(schema);
175175

176-
expect(result.getValue()).to.be.deep.eq([
176+
expect(result.getValue(data)).to.be.deep.eq([
177177
{
178178
'table.array': ['a', 'b'],
179179
'table.map': { key: 12 },
@@ -210,9 +210,9 @@ describe('JsonResult', () => {
210210
},
211211
];
212212

213-
const result = new JsonResult(schema, data);
213+
const result = new JsonResult(schema);
214214

215-
expect(result.getValue()).to.be.deep.eq([
215+
expect(result.getValue(data)).to.be.deep.eq([
216216
{ 'table.id': '0' },
217217
{ 'table.id': '1' },
218218
{ 'table.id': '2' },
@@ -221,7 +221,7 @@ describe('JsonResult', () => {
221221
});
222222

223223
it('should detect nulls', () => {
224-
const result = new JsonResult(null, []);
224+
const result = new JsonResult(null);
225225
const buf = Buffer.from([0x55, 0xaa, 0xc3]);
226226

227227
[
@@ -333,9 +333,9 @@ describe('JsonResult', () => {
333333
},
334334
];
335335

336-
const result = new JsonResult(schema, data);
336+
const result = new JsonResult(schema);
337337

338-
expect(result.getValue()).to.be.deep.eq([
338+
expect(result.getValue(data)).to.be.deep.eq([
339339
{
340340
'table.str': null,
341341
'table.int64': null,
@@ -356,4 +356,31 @@ describe('JsonResult', () => {
356356
},
357357
]);
358358
});
359+
360+
it('should return empty array if no data to process', () => {
361+
const schema = {
362+
columns: [getColumnSchema('table.id', TCLIService_types.TTypeId.STRING_TYPE, 1)],
363+
};
364+
365+
const result = new JsonResult(schema);
366+
367+
expect(result.getValue()).to.be.deep.eq([]);
368+
expect(result.getValue([])).to.be.deep.eq([]);
369+
});
370+
371+
it('should return empty array if no schema available', () => {
372+
const data = [
373+
{
374+
columns: [
375+
{
376+
stringVal: { values: ['0', '1'] },
377+
},
378+
],
379+
},
380+
];
381+
382+
const result = new JsonResult();
383+
384+
expect(result.getValue(data)).to.be.deep.eq([]);
385+
});
359386
});

0 commit comments

Comments
 (0)
Please sign in to comment.