Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAI-441 - Added revisions & users streams to Phabricator source + bump version #113

Merged
merged 8 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions destinations/faros-destination/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "faros-destination",
"version": "0.1.21",
"version": "0.1.22",
"private": true,
"description": "Faros Destination for Airbyte",
"keywords": [
Expand Down Expand Up @@ -30,7 +30,7 @@
"watch": "tsc -b -w src test"
},
"dependencies": {
"faros-airbyte-cdk": "^0.1.21",
"faros-airbyte-cdk": "^0.1.22",
"faros-feeds-sdk": "^0.8.30",
"jsonata": "^1.8.5",
"object-sizeof": "^1.6.1",
Expand Down
2 changes: 1 addition & 1 deletion faros-airbyte-cdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "faros-airbyte-cdk",
"version": "0.1.21",
"version": "0.1.22",
"description": "Airbyte Connector Development Kit (CDK) for JavaScript/TypeScript",
"keywords": [
"airbyte",
Expand Down
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.1.21",
"version": "0.1.22",
"packages": [
"faros-airbyte-cdk",
"destinations/**",
Expand Down
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@
"ts-node": "^10.2.1",
"typescript": "^4.4.3"
}
}
}
4 changes: 2 additions & 2 deletions sources/example-source/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "example-source",
"version": "0.1.21",
"version": "0.1.22",
"description": "Example Airbyte source",
"keywords": [
"airbyte",
Expand Down Expand Up @@ -31,7 +31,7 @@
"dependencies": {
"axios": "^0.21.4",
"commander": "^8.2.0",
"faros-airbyte-cdk": "^0.1.21",
"faros-airbyte-cdk": "^0.1.22",
"verror": "^1.10.0"
},
"jest": {
Expand Down
4 changes: 2 additions & 2 deletions sources/jenkins-source/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "jenkins-source",
"version": "0.1.21",
"version": "0.1.22",
"description": "Jenkins Airbyte source",
"keywords": [
"airbyte",
Expand Down Expand Up @@ -31,7 +31,7 @@
},
"dependencies": {
"axios": "^0.21.4",
"faros-airbyte-cdk": "^0.1.21",
"faros-airbyte-cdk": "^0.1.22",
"jenkins": "^0.28.1",
"typescript-memoize": "^1.0.1",
"verror": "^1.10.0"
Expand Down
10 changes: 6 additions & 4 deletions sources/jenkins-source/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ export class JenkinsSource extends AirbyteSourceBase {
async checkConnection(
config: JenkinsConfig
): Promise<[boolean, VError | undefined]> {
const [client, errorMessage] = await Jenkins.validateClient(config);
if (client) {
return [true, undefined];
try {
const jenkins = Jenkins.instance(config as JenkinsConfig, this.logger);
await jenkins.checkConnection();
} catch (err: any) {
return [false, err];
}
return [false, new VError(errorMessage)];
return [true, undefined];
}
streams(config: JenkinsConfig): AirbyteStreamBase[] {
return [new Builds(config, this.logger), new Jobs(config, this.logger)];
Expand Down
58 changes: 24 additions & 34 deletions sources/jenkins-source/src/jenkins.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import {AirbyteConfig, AirbyteLogger} from 'faros-airbyte-cdk';
import jenkinsClient, {JenkinsPromisifiedAPI} from 'jenkins';
import jenkinsClient from 'jenkins';
import {Memoize} from 'typescript-memoize';
import {URL} from 'url';
import util from 'util';
import {VError} from 'verror';

export const DEFAULT_PAGE_SIZE = 10;
const DEFAULT_PAGE_SIZE = 10;
const FEED_ALL_FIELDS_PATTERN = `name,fullName,url,lastCompletedBuild[number],%s[id,displayName,number,building,result,timestamp,duration,url,actions[lastBuiltRevision[SHA1],remoteUrls],fullName,fullDisplayName],jobs[*]`;
const FEED_JOBS_COUNT_PATTERN = 'jobs[name]';
const FEED_MAX_DEPTH_CALC_PATTERN = 'fullName,jobs[*]';
Expand Down Expand Up @@ -51,16 +52,16 @@ export interface JenkinsState {

export class Jenkins {
constructor(
private readonly client: any,
private readonly client: any, // It should be 'JenkinsPromisifiedAPI' instead of any, but we could not make it work
private readonly logger: AirbyteLogger
) {}

static parse(str: string, ...args: any[]): string {
private static parse(str: string, ...args: any[]): string {
let i = 0;
return str.replace(/%s/g, () => args[i++]);
}

static validateInteger(
private static validateInteger(
value: number
): [true | undefined, string | undefined] {
if (value) {
Expand All @@ -72,44 +73,30 @@ export class Jenkins {
return [true, undefined];
}

static async make(
config: JenkinsConfig,
logger: AirbyteLogger
): Promise<Jenkins | undefined> {
const [client, errorMessage] = await Jenkins.validateClient(config);
if (!client) {
logger.error(errorMessage || '');
return undefined;
}
return new Jenkins(client, logger);
}

static async validateClient(
config: JenkinsConfig
): Promise<[JenkinsPromisifiedAPI | undefined, string | undefined]> {
static instance(config: JenkinsConfig, logger: AirbyteLogger): Jenkins {
if (typeof config.server_url !== 'string') {
return [undefined, 'server_url: must be a string'];
throw new VError('server_url: must be a string');
}
if (typeof config.user !== 'string') {
return [undefined, 'User: must be a string'];
throw new VError('user: must be a string');
}
if (typeof config.token !== 'string') {
return [undefined, 'Token: must be a string'];
throw new VError('token: must be a string');
}
const depthCheck = Jenkins.validateInteger(config.depth);
if (!depthCheck[0]) {
return [undefined, depthCheck[1]];
throw new VError(depthCheck[1]);
}
const pageSizeCheck = Jenkins.validateInteger(config.pageSize);
if (!pageSizeCheck[0]) {
return [undefined, pageSizeCheck[1]];
throw new VError(pageSizeCheck[1]);
}

let jenkinsUrl;
let jenkinsUrl: URL;
try {
jenkinsUrl = new URL(config.server_url);
} catch (error) {
return [undefined, 'server_url: must be a valid url'];
throw new VError('server_url: must be a valid url');
}

jenkinsUrl.username = config.user;
Expand All @@ -121,15 +108,18 @@ export class Jenkins {
promisify: true,
});

return new Jenkins(client, logger);
}

async checkConnection(): Promise<void> {
try {
await client.info();
} catch (error) {
return [
undefined,
'Please verify your server_url and user/token are correct',
];
await this.client.info();
} catch (error: any) {
const err = error?.message ?? JSON.stringify(error);
throw new VError(
`Please verify your server_url and user/token are correct. Error: ${err}`
);
}
return [client, undefined];
}

async *syncBuilds(
Expand Down
25 changes: 5 additions & 20 deletions sources/jenkins-source/src/streams/builds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@ import {
} from 'faros-airbyte-cdk';
import {Dictionary} from 'ts-essentials';

import {
Build,
DEFAULT_PAGE_SIZE,
Jenkins,
JenkinsConfig,
JenkinsState,
} from '../jenkins';
import {Build, Jenkins, JenkinsConfig, JenkinsState} from '../jenkins';

export class Builds extends AirbyteStreamBase {
constructor(readonly config: JenkinsConfig, logger: AirbyteLogger) {
Expand All @@ -28,26 +22,17 @@ export class Builds extends AirbyteStreamBase {
get cursorField(): string | string[] {
return 'number';
}
get stateCheckpointInterval(): number {
return 10 * (this.config.pageSize ?? DEFAULT_PAGE_SIZE);
}

async *readRecords(
syncMode: SyncMode,
cursorField?: string[],
streamSlice?: Build,
streamState?: JenkinsState
): AsyncGenerator<Build, any, any> {
const jenkins = await Jenkins.make(this.config, this.logger);
if (!jenkins) return;
const jenkins = Jenkins.instance(this.config, this.logger);
const state =
syncMode === SyncMode.INCREMENTAL ? streamState ?? null : null;

let iter: AsyncGenerator<Build, any, unknown>;
if (syncMode === SyncMode.INCREMENTAL) {
iter = jenkins.syncBuilds(this.config, streamState ?? null);
} else {
iter = jenkins.syncBuilds(this.config, null);
}
yield* iter;
yield* jenkins.syncBuilds(this.config, state);
}

getUpdatedState(
Expand Down
17 changes: 5 additions & 12 deletions sources/jenkins-source/src/streams/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from 'faros-airbyte-cdk';
import {Dictionary} from 'ts-essentials';

import {DEFAULT_PAGE_SIZE, Jenkins, JenkinsConfig, Job} from '../jenkins';
import {Jenkins, JenkinsConfig, Job} from '../jenkins';

export class Jobs extends AirbyteStreamBase {
constructor(readonly config: JenkinsConfig, logger: AirbyteLogger) {
Expand All @@ -22,9 +22,6 @@ export class Jobs extends AirbyteStreamBase {
get cursorField(): string | string[] {
return ['url'];
}
get stateCheckpointInterval(): number {
return 10 * (this.config.pageSize ?? DEFAULT_PAGE_SIZE);
}
async *streamSlices(
syncMode: SyncMode,
cursorField?: string[],
Expand Down Expand Up @@ -52,15 +49,11 @@ export class Jobs extends AirbyteStreamBase {
streamSlice?: Job,
streamState?: any
): AsyncGenerator<Job, any, any> {
const jenkins = await Jenkins.make(this.config, this.logger);
if (!jenkins) return;
const jenkins = Jenkins.instance(this.config, this.logger);
const state =
syncMode === SyncMode.INCREMENTAL ? streamSlice || null : null;

let jobs: Job[];
if (syncMode === SyncMode.INCREMENTAL) {
jobs = await jenkins.syncJobs(this.config, streamSlice || null);
} else {
jobs = await jenkins.syncJobs(this.config, null);
}
const jobs: Job[] = await jenkins.syncJobs(this.config, state);
for (const job of jobs) {
yield job;
}
Expand Down
24 changes: 15 additions & 9 deletions sources/jenkins-source/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('index', () => {

test('spec', async () => {
const source = new sut.JenkinsSource(logger);
expect(source.spec()).resolves.toStrictEqual(
await expect(source.spec()).resolves.toStrictEqual(
new AirbyteSpec(readTestResourceFile('spec.json'))
);
});
Expand All @@ -40,28 +40,30 @@ describe('index', () => {
info: jest.fn().mockRejectedValue({}),
} as any);
const source = new sut.JenkinsSource(logger);
expect(
await expect(
source.checkConnection({
user: '123',
token: 'token',
server_url: 'http://example.com',
})
).resolves.toStrictEqual([true, undefined]);
expect(
await expect(
source.checkConnection({
user: '123',
token: 'token',
server_url: 'http://example.com',
})
).resolves.toStrictEqual([
false,
new VError('Please verify your server_url and user/token are correct'),
new VError(
'Please verify your server_url and user/token are correct. Error: {}'
),
]);
expect(source.checkConnection({} as any)).resolves.toStrictEqual([
await expect(source.checkConnection({} as any)).resolves.toStrictEqual([
false,
new VError('server_url: must be a string'),
]);
expect(
await expect(
source.checkConnection({server_url: '111', user: '', token: ''})
).resolves.toStrictEqual([
false,
Expand Down Expand Up @@ -128,16 +130,20 @@ describe('index', () => {
]);
});

test('streams - return undefined if config not correct', async () => {
test('streams - error out if config not correct', async () => {
mocked(jenkinsClient).mockReturnValue({
info: jest.fn().mockResolvedValue({}),
} as any);
const source = new sut.JenkinsSource(logger);
const [jobStream, buildStream] = source.streams({} as any);
const jobIter = jobStream.readRecords(SyncMode.FULL_REFRESH);
const buildIter = buildStream.readRecords(SyncMode.FULL_REFRESH);
expect((await jobIter.next()).value).toBeUndefined();
expect((await buildIter.next()).value).toBeUndefined();
await expect(jobIter.next()).rejects.toStrictEqual(
new VError('server_url: must be a string')
);
await expect(buildIter.next()).rejects.toStrictEqual(
new VError('server_url: must be a string')
);
});

test('streams - builds, use incremental sync mode', async () => {
Expand Down
Loading