Skip to content

Commit cd802bf

Browse files
tovbinmRoma-Kyrnis
authored andcommitted
FAI-441 - Added revisions & users streams to Phabricator source + bump version (#113)
1 parent d6570d6 commit cd802bf

25 files changed

+1781
-136
lines changed

destinations/faros-destination/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "faros-destination",
3-
"version": "0.1.21",
3+
"version": "0.1.22",
44
"private": true,
55
"description": "Faros Destination for Airbyte",
66
"keywords": [
@@ -30,7 +30,7 @@
3030
"watch": "tsc -b -w src test"
3131
},
3232
"dependencies": {
33-
"faros-airbyte-cdk": "^0.1.21",
33+
"faros-airbyte-cdk": "^0.1.22",
3434
"faros-feeds-sdk": "^0.8.30",
3535
"jsonata": "^1.8.5",
3636
"object-sizeof": "^1.6.1",

faros-airbyte-cdk/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "faros-airbyte-cdk",
3-
"version": "0.1.21",
3+
"version": "0.1.22",
44
"description": "Airbyte Connector Development Kit (CDK) for JavaScript/TypeScript",
55
"keywords": [
66
"airbyte",

lerna.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"version": "0.1.21",
2+
"version": "0.1.22",
33
"packages": [
44
"faros-airbyte-cdk",
55
"destinations/**",

package-lock.json

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,4 @@
3737
"ts-node": "^10.2.1",
3838
"typescript": "^4.4.3"
3939
}
40-
}
40+
}

sources/example-source/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "example-source",
3-
"version": "0.1.21",
3+
"version": "0.1.22",
44
"description": "Example Airbyte source",
55
"keywords": [
66
"airbyte",
@@ -31,7 +31,7 @@
3131
"dependencies": {
3232
"axios": "^0.21.4",
3333
"commander": "^8.2.0",
34-
"faros-airbyte-cdk": "^0.1.21",
34+
"faros-airbyte-cdk": "^0.1.22",
3535
"verror": "^1.10.0"
3636
},
3737
"jest": {

sources/jenkins-source/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "jenkins-source",
3-
"version": "0.1.21",
3+
"version": "0.1.22",
44
"description": "Jenkins Airbyte source",
55
"keywords": [
66
"airbyte",
@@ -31,7 +31,7 @@
3131
},
3232
"dependencies": {
3333
"axios": "^0.21.4",
34-
"faros-airbyte-cdk": "^0.1.21",
34+
"faros-airbyte-cdk": "^0.1.22",
3535
"jenkins": "^0.28.1",
3636
"typescript-memoize": "^1.0.1",
3737
"verror": "^1.10.0"

sources/jenkins-source/src/index.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ export class JenkinsSource extends AirbyteSourceBase {
2626
async checkConnection(
2727
config: JenkinsConfig
2828
): Promise<[boolean, VError | undefined]> {
29-
const [client, errorMessage] = await Jenkins.validateClient(config);
30-
if (client) {
31-
return [true, undefined];
29+
try {
30+
const jenkins = Jenkins.instance(config as JenkinsConfig, this.logger);
31+
await jenkins.checkConnection();
32+
} catch (err: any) {
33+
return [false, err];
3234
}
33-
return [false, new VError(errorMessage)];
35+
return [true, undefined];
3436
}
3537
streams(config: JenkinsConfig): AirbyteStreamBase[] {
3638
return [new Builds(config, this.logger), new Jobs(config, this.logger)];

sources/jenkins-source/src/jenkins.ts

+24-34
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import {AirbyteConfig, AirbyteLogger} from 'faros-airbyte-cdk';
2-
import jenkinsClient, {JenkinsPromisifiedAPI} from 'jenkins';
2+
import jenkinsClient from 'jenkins';
33
import {Memoize} from 'typescript-memoize';
44
import {URL} from 'url';
55
import util from 'util';
6+
import {VError} from 'verror';
67

7-
export const DEFAULT_PAGE_SIZE = 10;
8+
const DEFAULT_PAGE_SIZE = 10;
89
const FEED_ALL_FIELDS_PATTERN = `name,fullName,url,lastCompletedBuild[number],%s[id,displayName,number,building,result,timestamp,duration,url,actions[lastBuiltRevision[SHA1],remoteUrls],fullName,fullDisplayName],jobs[*]`;
910
const FEED_JOBS_COUNT_PATTERN = 'jobs[name]';
1011
const FEED_MAX_DEPTH_CALC_PATTERN = 'fullName,jobs[*]';
@@ -51,16 +52,16 @@ export interface JenkinsState {
5152

5253
export class Jenkins {
5354
constructor(
54-
private readonly client: any,
55+
private readonly client: any, // It should be 'JenkinsPromisifiedAPI' instead of any, but we could not make it work
5556
private readonly logger: AirbyteLogger
5657
) {}
5758

58-
static parse(str: string, ...args: any[]): string {
59+
private static parse(str: string, ...args: any[]): string {
5960
let i = 0;
6061
return str.replace(/%s/g, () => args[i++]);
6162
}
6263

63-
static validateInteger(
64+
private static validateInteger(
6465
value: number
6566
): [true | undefined, string | undefined] {
6667
if (value) {
@@ -72,44 +73,30 @@ export class Jenkins {
7273
return [true, undefined];
7374
}
7475

75-
static async make(
76-
config: JenkinsConfig,
77-
logger: AirbyteLogger
78-
): Promise<Jenkins | undefined> {
79-
const [client, errorMessage] = await Jenkins.validateClient(config);
80-
if (!client) {
81-
logger.error(errorMessage || '');
82-
return undefined;
83-
}
84-
return new Jenkins(client, logger);
85-
}
86-
87-
static async validateClient(
88-
config: JenkinsConfig
89-
): Promise<[JenkinsPromisifiedAPI | undefined, string | undefined]> {
76+
static instance(config: JenkinsConfig, logger: AirbyteLogger): Jenkins {
9077
if (typeof config.server_url !== 'string') {
91-
return [undefined, 'server_url: must be a string'];
78+
throw new VError('server_url: must be a string');
9279
}
9380
if (typeof config.user !== 'string') {
94-
return [undefined, 'User: must be a string'];
81+
throw new VError('user: must be a string');
9582
}
9683
if (typeof config.token !== 'string') {
97-
return [undefined, 'Token: must be a string'];
84+
throw new VError('token: must be a string');
9885
}
9986
const depthCheck = Jenkins.validateInteger(config.depth);
10087
if (!depthCheck[0]) {
101-
return [undefined, depthCheck[1]];
88+
throw new VError(depthCheck[1]);
10289
}
10390
const pageSizeCheck = Jenkins.validateInteger(config.pageSize);
10491
if (!pageSizeCheck[0]) {
105-
return [undefined, pageSizeCheck[1]];
92+
throw new VError(pageSizeCheck[1]);
10693
}
10794

108-
let jenkinsUrl;
95+
let jenkinsUrl: URL;
10996
try {
11097
jenkinsUrl = new URL(config.server_url);
11198
} catch (error) {
112-
return [undefined, 'server_url: must be a valid url'];
99+
throw new VError('server_url: must be a valid url');
113100
}
114101

115102
jenkinsUrl.username = config.user;
@@ -121,15 +108,18 @@ export class Jenkins {
121108
promisify: true,
122109
});
123110

111+
return new Jenkins(client, logger);
112+
}
113+
114+
async checkConnection(): Promise<void> {
124115
try {
125-
await client.info();
126-
} catch (error) {
127-
return [
128-
undefined,
129-
'Please verify your server_url and user/token are correct',
130-
];
116+
await this.client.info();
117+
} catch (error: any) {
118+
const err = error?.message ?? JSON.stringify(error);
119+
throw new VError(
120+
`Please verify your server_url and user/token are correct. Error: ${err}`
121+
);
131122
}
132-
return [client, undefined];
133123
}
134124

135125
async *syncBuilds(

sources/jenkins-source/src/streams/builds.ts

+5-20
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,7 @@ import {
66
} from 'faros-airbyte-cdk';
77
import {Dictionary} from 'ts-essentials';
88

9-
import {
10-
Build,
11-
DEFAULT_PAGE_SIZE,
12-
Jenkins,
13-
JenkinsConfig,
14-
JenkinsState,
15-
} from '../jenkins';
9+
import {Build, Jenkins, JenkinsConfig, JenkinsState} from '../jenkins';
1610

1711
export class Builds extends AirbyteStreamBase {
1812
constructor(readonly config: JenkinsConfig, logger: AirbyteLogger) {
@@ -28,26 +22,17 @@ export class Builds extends AirbyteStreamBase {
2822
get cursorField(): string | string[] {
2923
return 'number';
3024
}
31-
get stateCheckpointInterval(): number {
32-
return 10 * (this.config.pageSize ?? DEFAULT_PAGE_SIZE);
33-
}
34-
3525
async *readRecords(
3626
syncMode: SyncMode,
3727
cursorField?: string[],
3828
streamSlice?: Build,
3929
streamState?: JenkinsState
4030
): AsyncGenerator<Build, any, any> {
41-
const jenkins = await Jenkins.make(this.config, this.logger);
42-
if (!jenkins) return;
31+
const jenkins = Jenkins.instance(this.config, this.logger);
32+
const state =
33+
syncMode === SyncMode.INCREMENTAL ? streamState ?? null : null;
4334

44-
let iter: AsyncGenerator<Build, any, unknown>;
45-
if (syncMode === SyncMode.INCREMENTAL) {
46-
iter = jenkins.syncBuilds(this.config, streamState ?? null);
47-
} else {
48-
iter = jenkins.syncBuilds(this.config, null);
49-
}
50-
yield* iter;
35+
yield* jenkins.syncBuilds(this.config, state);
5136
}
5237

5338
getUpdatedState(

sources/jenkins-source/src/streams/jobs.ts

+5-12
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
} from 'faros-airbyte-cdk';
77
import {Dictionary} from 'ts-essentials';
88

9-
import {DEFAULT_PAGE_SIZE, Jenkins, JenkinsConfig, Job} from '../jenkins';
9+
import {Jenkins, JenkinsConfig, Job} from '../jenkins';
1010

1111
export class Jobs extends AirbyteStreamBase {
1212
constructor(readonly config: JenkinsConfig, logger: AirbyteLogger) {
@@ -22,9 +22,6 @@ export class Jobs extends AirbyteStreamBase {
2222
get cursorField(): string | string[] {
2323
return ['url'];
2424
}
25-
get stateCheckpointInterval(): number {
26-
return 10 * (this.config.pageSize ?? DEFAULT_PAGE_SIZE);
27-
}
2825
async *streamSlices(
2926
syncMode: SyncMode,
3027
cursorField?: string[],
@@ -52,15 +49,11 @@ export class Jobs extends AirbyteStreamBase {
5249
streamSlice?: Job,
5350
streamState?: any
5451
): AsyncGenerator<Job, any, any> {
55-
const jenkins = await Jenkins.make(this.config, this.logger);
56-
if (!jenkins) return;
52+
const jenkins = Jenkins.instance(this.config, this.logger);
53+
const state =
54+
syncMode === SyncMode.INCREMENTAL ? streamSlice || null : null;
5755

58-
let jobs: Job[];
59-
if (syncMode === SyncMode.INCREMENTAL) {
60-
jobs = await jenkins.syncJobs(this.config, streamSlice || null);
61-
} else {
62-
jobs = await jenkins.syncJobs(this.config, null);
63-
}
56+
const jobs: Job[] = await jenkins.syncJobs(this.config, state);
6457
for (const job of jobs) {
6558
yield job;
6659
}

sources/jenkins-source/test/index.test.ts

+15-9
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ describe('index', () => {
2727

2828
test('spec', async () => {
2929
const source = new sut.JenkinsSource(logger);
30-
expect(source.spec()).resolves.toStrictEqual(
30+
await expect(source.spec()).resolves.toStrictEqual(
3131
new AirbyteSpec(readTestResourceFile('spec.json'))
3232
);
3333
});
@@ -40,28 +40,30 @@ describe('index', () => {
4040
info: jest.fn().mockRejectedValue({}),
4141
} as any);
4242
const source = new sut.JenkinsSource(logger);
43-
expect(
43+
await expect(
4444
source.checkConnection({
4545
user: '123',
4646
token: 'token',
4747
server_url: 'http://example.com',
4848
})
4949
).resolves.toStrictEqual([true, undefined]);
50-
expect(
50+
await expect(
5151
source.checkConnection({
5252
user: '123',
5353
token: 'token',
5454
server_url: 'http://example.com',
5555
})
5656
).resolves.toStrictEqual([
5757
false,
58-
new VError('Please verify your server_url and user/token are correct'),
58+
new VError(
59+
'Please verify your server_url and user/token are correct. Error: {}'
60+
),
5961
]);
60-
expect(source.checkConnection({} as any)).resolves.toStrictEqual([
62+
await expect(source.checkConnection({} as any)).resolves.toStrictEqual([
6163
false,
6264
new VError('server_url: must be a string'),
6365
]);
64-
expect(
66+
await expect(
6567
source.checkConnection({server_url: '111', user: '', token: ''})
6668
).resolves.toStrictEqual([
6769
false,
@@ -128,16 +130,20 @@ describe('index', () => {
128130
]);
129131
});
130132

131-
test('streams - return undefined if config not correct', async () => {
133+
test('streams - error out if config not correct', async () => {
132134
mocked(jenkinsClient).mockReturnValue({
133135
info: jest.fn().mockResolvedValue({}),
134136
} as any);
135137
const source = new sut.JenkinsSource(logger);
136138
const [jobStream, buildStream] = source.streams({} as any);
137139
const jobIter = jobStream.readRecords(SyncMode.FULL_REFRESH);
138140
const buildIter = buildStream.readRecords(SyncMode.FULL_REFRESH);
139-
expect((await jobIter.next()).value).toBeUndefined();
140-
expect((await buildIter.next()).value).toBeUndefined();
141+
await expect(jobIter.next()).rejects.toStrictEqual(
142+
new VError('server_url: must be a string')
143+
);
144+
await expect(buildIter.next()).rejects.toStrictEqual(
145+
new VError('server_url: must be a string')
146+
);
141147
});
142148

143149
test('streams - builds, use incremental sync mode', async () => {

0 commit comments

Comments
 (0)