diff --git a/faros-airbyte-cdk/src/sources/source-base.ts b/faros-airbyte-cdk/src/sources/source-base.ts index 2d06ec89d..4a481d333 100644 --- a/faros-airbyte-cdk/src/sources/source-base.ts +++ b/faros-airbyte-cdk/src/sources/source-base.ts @@ -160,7 +160,7 @@ export abstract class AirbyteSourceBase extends AirbyteSource { let recordCounter = 0; const streamName = configuredStream.stream.name; const mode = useIncremental ? 'incremental' : 'full'; - this.logger.info(`Syncing stream ${streamName} in ${mode} mode`); + this.logger.info(`Syncing ${streamName} stream in ${mode} mode`); for await (const record of recordGenerator) { if (record.type === AirbyteMessageType.RECORD) { @@ -168,7 +168,9 @@ export abstract class AirbyteSourceBase extends AirbyteSource { } yield record; } - this.logger.info(`Read ${recordCounter} records from ${streamName} stream`); + this.logger.info( + `Finished syncing ${streamName} stream. Read ${recordCounter} records` + ); } private async *readIncremental( diff --git a/package-lock.json b/package-lock.json index ac6da61c0..7d0bef21a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "@types/jenkins": "^0.23.2", "axios": "^0.21.4", "commander": "^8.2.0", - "condoit": "^2.0.8", + "condoit": "^2.0.11", "faros-feeds-sdk": "^0.8.30", "fast-redact": "^3.0.2", "jenkins": "^0.28.1", @@ -5484,47 +5484,14 @@ } }, "node_modules/condoit": { - "version": "2.0.8", - "resolved": "https://registry.npmjs.org/condoit/-/condoit-2.0.8.tgz", - "integrity": "sha512-1/0GSnC/LgtT1+PB6l8y4DOyurUiKPOaaf7RFwra7fTom/+ckzDrTZNIvab70mO72Hy0fSPwMscafgAiAYqysg==", - "dependencies": { - "axios": "^0.19.0", - "qs": "^6.9.1" - } - }, - "node_modules/condoit/node_modules/axios": { - "version": "0.19.2", - "resolved": "https://registry.npmjs.org/axios/-/axios-0.19.2.tgz", - "integrity": "sha512-fjgm5MvRHLhx+osE2xoekY70AhARk3a6hkN+3Io1jc00jtquGvxYlKlsFUhmUET0V5te6CcZI7lcv2Ym61mjHA==", - "deprecated": "Critical security vulnerability fixed in v0.21.1. For more information, see https://github.com/axios/axios/pull/3410", - "dependencies": { - "follow-redirects": "1.5.10" - } - }, - "node_modules/condoit/node_modules/debug": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz", - "integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==", + "version": "2.0.11", + "resolved": "https://registry.npmjs.org/condoit/-/condoit-2.0.11.tgz", + "integrity": "sha512-ic38W/ZSRPU4PrVupMFwjP3nlo9zNctFedjoGFQ528HYHlmjD0hNL2iU/H6V0ju1ZX0/i3VrQfFxtDmDfSV1YQ==", "dependencies": { - "ms": "2.0.0" - } - }, - "node_modules/condoit/node_modules/follow-redirects": { - "version": "1.5.10", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.5.10.tgz", - "integrity": "sha512-0V5l4Cizzvqt5D44aTXbFZz+FtyXV1vrDN6qrelxtfYQKW0KO0W2T/hkE8xvGa/540LkZlkaUjO4ailYTFtHVQ==", - "dependencies": { - "debug": "=3.1.0" - }, - "engines": { - "node": ">=4.0" + "axios": "^0.21.4", + "qs": "^6.10.1" } }, - "node_modules/condoit/node_modules/ms": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", - "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" - }, "node_modules/config-chain": { "version": "1.1.13", "resolved": "https://registry.npmjs.org/config-chain/-/config-chain-1.1.13.tgz", @@ -20955,43 +20922,12 @@ } }, "condoit": { - "version": "2.0.8", - "resolved": "https://registry.npmjs.org/condoit/-/condoit-2.0.8.tgz", - "integrity": "sha512-1/0GSnC/LgtT1+PB6l8y4DOyurUiKPOaaf7RFwra7fTom/+ckzDrTZNIvab70mO72Hy0fSPwMscafgAiAYqysg==", + "version": "2.0.11", + "resolved": "https://registry.npmjs.org/condoit/-/condoit-2.0.11.tgz", + "integrity": "sha512-ic38W/ZSRPU4PrVupMFwjP3nlo9zNctFedjoGFQ528HYHlmjD0hNL2iU/H6V0ju1ZX0/i3VrQfFxtDmDfSV1YQ==", "requires": { - "axios": "^0.19.0", - "qs": "^6.9.1" - }, - "dependencies": { - "axios": { - "version": "0.19.2", - "resolved": "https://registry.npmjs.org/axios/-/axios-0.19.2.tgz", - "integrity": "sha512-fjgm5MvRHLhx+osE2xoekY70AhARk3a6hkN+3Io1jc00jtquGvxYlKlsFUhmUET0V5te6CcZI7lcv2Ym61mjHA==", - "requires": { - "follow-redirects": "1.5.10" - } - }, - "debug": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz", - "integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==", - "requires": { - "ms": "2.0.0" - } - }, - "follow-redirects": { - "version": "1.5.10", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.5.10.tgz", - "integrity": "sha512-0V5l4Cizzvqt5D44aTXbFZz+FtyXV1vrDN6qrelxtfYQKW0KO0W2T/hkE8xvGa/540LkZlkaUjO4ailYTFtHVQ==", - "requires": { - "debug": "=3.1.0" - } - }, - "ms": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", - "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" - } + "axios": "^0.21.4", + "qs": "^6.10.1" } }, "config-chain": { diff --git a/sources/phabricator-source/package.json b/sources/phabricator-source/package.json index d80fd2ce3..addb4d69d 100644 --- a/sources/phabricator-source/package.json +++ b/sources/phabricator-source/package.json @@ -31,7 +31,7 @@ }, "dependencies": { "axios": "^0.21.4", - "condoit": "^2.0.8", + "condoit": "^2.0.11", "commander": "^8.2.0", "faros-airbyte-cdk": "^0.1.21", "moment": "^2.29.1", diff --git a/sources/phabricator-source/resources/schemas/commits.json b/sources/phabricator-source/resources/schemas/commits.json index 7f195121b..143bc616b 100644 --- a/sources/phabricator-source/resources/schemas/commits.json +++ b/sources/phabricator-source/resources/schemas/commits.json @@ -182,6 +182,1057 @@ "projects", "subscribers" ] + }, + "repository": { + "type": "object", + "properties": { + "id": { + "type": "integer" + }, + "type": { + "type": "string" + }, + "phid": { + "type": "string" + }, + "fields": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "vcs": { + "type": "string" + }, + "callsign": { + "type": "null" + }, + "shortName": { + "type": "string" + }, + "status": { + "type": "string" + }, + "isImporting": { + "type": "boolean" + }, + "almanacServicePHID": { + "type": "null" + }, + "refRules": { + "type": "object", + "properties": { + "fetchRules": { + "type": "array", + "items": {} + }, + "trackRules": { + "type": "array", + "items": {} + }, + "permanentRefRules": { + "type": "array", + "items": {} + } + }, + "required": [ + "fetchRules", + "trackRules", + "permanentRefRules" + ] + }, + "defaultBranch": { + "type": "string" + }, + "description": { + "type": "object", + "properties": { + "raw": { + "type": "string" + } + }, + "required": [ + "raw" + ] + }, + "spacePHID": { + "type": "null" + }, + "dateCreated": { + "type": "integer" + }, + "dateModified": { + "type": "integer" + }, + "policy": { + "type": "object", + "properties": { + "view": { + "type": "string" + }, + "edit": { + "type": "string" + }, + "diffusion.push": { + "type": "string" + } + }, + "required": [ + "view", + "edit", + "diffusion.push" + ] + } + }, + "required": [ + "name", + "vcs", + "callsign", + "shortName", + "status", + "isImporting", + "almanacServicePHID", + "refRules", + "defaultBranch", + "description", + "spacePHID", + "dateCreated", + "dateModified", + "policy" + ] + }, + "attachments": { + "type": "object", + "properties": { + "projects": { + "type": "object", + "properties": { + "projectPHIDs": { + "type": "array", + "items": {} + } + }, + "required": [ + "projectPHIDs" + ] + }, + "uris": { + "type": "object", + "properties": { + "uris": { + "type": "array", + "items": [ + { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "type": { + "type": "string" + }, + "phid": { + "type": "string" + }, + "fields": { + "type": "object", + "properties": { + "repositoryPHID": { + "type": "string" + }, + "uri": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "display": { + "type": "string" + }, + "effective": { + "type": "string" + }, + "normalized": { + "type": "string" + } + }, + "required": [ + "raw", + "display", + "effective", + "normalized" + ] + }, + "io": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "display": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "credentialPHID": { + "type": "null" + }, + "disabled": { + "type": "boolean" + }, + "builtin": { + "type": "object", + "properties": { + "protocol": { + "type": "string" + }, + "identifier": { + "type": "string" + } + }, + "required": [ + "protocol", + "identifier" + ] + }, + "dateCreated": { + "type": "string" + }, + "dateModified": { + "type": "string" + } + }, + "required": [ + "repositoryPHID", + "uri", + "io", + "display", + "credentialPHID", + "disabled", + "builtin", + "dateCreated", + "dateModified" + ] + } + }, + "required": [ + "id", + "type", + "phid", + "fields" + ] + }, + { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "type": { + "type": "string" + }, + "phid": { + "type": "string" + }, + "fields": { + "type": "object", + "properties": { + "repositoryPHID": { + "type": "string" + }, + "uri": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "display": { + "type": "string" + }, + "effective": { + "type": "string" + }, + "normalized": { + "type": "string" + } + }, + "required": [ + "raw", + "display", + "effective", + "normalized" + ] + }, + "io": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "display": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "credentialPHID": { + "type": "null" + }, + "disabled": { + "type": "boolean" + }, + "builtin": { + "type": "object", + "properties": { + "protocol": { + "type": "string" + }, + "identifier": { + "type": "string" + } + }, + "required": [ + "protocol", + "identifier" + ] + }, + "dateCreated": { + "type": "string" + }, + "dateModified": { + "type": "string" + } + }, + "required": [ + "repositoryPHID", + "uri", + "io", + "display", + "credentialPHID", + "disabled", + "builtin", + "dateCreated", + "dateModified" + ] + } + }, + "required": [ + "id", + "type", + "phid", + "fields" + ] + }, + { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "type": { + "type": "string" + }, + "phid": { + "type": "string" + }, + "fields": { + "type": "object", + "properties": { + "repositoryPHID": { + "type": "string" + }, + "uri": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "display": { + "type": "string" + }, + "effective": { + "type": "string" + }, + "normalized": { + "type": "string" + } + }, + "required": [ + "raw", + "display", + "effective", + "normalized" + ] + }, + "io": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "display": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "credentialPHID": { + "type": "null" + }, + "disabled": { + "type": "boolean" + }, + "builtin": { + "type": "object", + "properties": { + "protocol": { + "type": "string" + }, + "identifier": { + "type": "string" + } + }, + "required": [ + "protocol", + "identifier" + ] + }, + "dateCreated": { + "type": "string" + }, + "dateModified": { + "type": "string" + } + }, + "required": [ + "repositoryPHID", + "uri", + "io", + "display", + "credentialPHID", + "disabled", + "builtin", + "dateCreated", + "dateModified" + ] + } + }, + "required": [ + "id", + "type", + "phid", + "fields" + ] + }, + { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "type": { + "type": "string" + }, + "phid": { + "type": "string" + }, + "fields": { + "type": "object", + "properties": { + "repositoryPHID": { + "type": "string" + }, + "uri": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "display": { + "type": "string" + }, + "effective": { + "type": "string" + }, + "normalized": { + "type": "string" + } + }, + "required": [ + "raw", + "display", + "effective", + "normalized" + ] + }, + "io": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "display": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "credentialPHID": { + "type": "null" + }, + "disabled": { + "type": "boolean" + }, + "builtin": { + "type": "object", + "properties": { + "protocol": { + "type": "string" + }, + "identifier": { + "type": "string" + } + }, + "required": [ + "protocol", + "identifier" + ] + }, + "dateCreated": { + "type": "string" + }, + "dateModified": { + "type": "string" + } + }, + "required": [ + "repositoryPHID", + "uri", + "io", + "display", + "credentialPHID", + "disabled", + "builtin", + "dateCreated", + "dateModified" + ] + } + }, + "required": [ + "id", + "type", + "phid", + "fields" + ] + }, + { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "type": { + "type": "string" + }, + "phid": { + "type": "string" + }, + "fields": { + "type": "object", + "properties": { + "repositoryPHID": { + "type": "string" + }, + "uri": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "display": { + "type": "string" + }, + "effective": { + "type": "string" + }, + "normalized": { + "type": "string" + } + }, + "required": [ + "raw", + "display", + "effective", + "normalized" + ] + }, + "io": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "display": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "credentialPHID": { + "type": "null" + }, + "disabled": { + "type": "boolean" + }, + "builtin": { + "type": "object", + "properties": { + "protocol": { + "type": "string" + }, + "identifier": { + "type": "string" + } + }, + "required": [ + "protocol", + "identifier" + ] + }, + "dateCreated": { + "type": "string" + }, + "dateModified": { + "type": "string" + } + }, + "required": [ + "repositoryPHID", + "uri", + "io", + "display", + "credentialPHID", + "disabled", + "builtin", + "dateCreated", + "dateModified" + ] + } + }, + "required": [ + "id", + "type", + "phid", + "fields" + ] + }, + { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "type": { + "type": "string" + }, + "phid": { + "type": "string" + }, + "fields": { + "type": "object", + "properties": { + "repositoryPHID": { + "type": "string" + }, + "uri": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "display": { + "type": "string" + }, + "effective": { + "type": "string" + }, + "normalized": { + "type": "string" + } + }, + "required": [ + "raw", + "display", + "effective", + "normalized" + ] + }, + "io": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "display": { + "type": "object", + "properties": { + "raw": { + "type": "string" + }, + "default": { + "type": "string" + }, + "effective": { + "type": "string" + } + }, + "required": [ + "raw", + "default", + "effective" + ] + }, + "credentialPHID": { + "type": "null" + }, + "disabled": { + "type": "boolean" + }, + "builtin": { + "type": "object", + "properties": { + "protocol": { + "type": "string" + }, + "identifier": { + "type": "string" + } + }, + "required": [ + "protocol", + "identifier" + ] + }, + "dateCreated": { + "type": "string" + }, + "dateModified": { + "type": "string" + } + }, + "required": [ + "repositoryPHID", + "uri", + "io", + "display", + "credentialPHID", + "disabled", + "builtin", + "dateCreated", + "dateModified" + ] + } + }, + "required": [ + "id", + "type", + "phid", + "fields" + ] + } + ] + } + }, + "required": [ + "uris" + ] + }, + "metrics": { + "type": "object", + "properties": { + "commitCount": { + "type": "integer" + }, + "recentCommit": { + "type": "object", + "properties": { + "identifier": { + "type": "string" + }, + "repositoryPHID": { + "type": "string" + }, + "author": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "email": { + "type": "string" + }, + "raw": { + "type": "string" + }, + "epoch": { + "type": "integer" + }, + "identityPHID": { + "type": "string" + }, + "userPHID": { + "type": "string" + } + }, + "required": [ + "name", + "email", + "raw", + "epoch", + "identityPHID", + "userPHID" + ] + }, + "committer": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "email": { + "type": "string" + }, + "raw": { + "type": "string" + }, + "epoch": { + "type": "integer" + }, + "identityPHID": { + "type": "string" + }, + "userPHID": { + "type": "string" + } + }, + "required": [ + "name", + "email", + "raw", + "epoch", + "identityPHID", + "userPHID" + ] + }, + "isUnreachable": { + "type": "boolean" + }, + "isImported": { + "type": "boolean" + }, + "auditStatus": { + "type": "object", + "properties": { + "value": { + "type": "string" + }, + "name": { + "type": "string" + }, + "closed": { + "type": "boolean" + }, + "color.ansi": { + "type": "null" + } + }, + "required": [ + "value", + "name", + "closed", + "color.ansi" + ] + }, + "message": { + "type": "string" + } + }, + "required": [ + "identifier", + "repositoryPHID", + "author", + "committer", + "isUnreachable", + "isImported", + "auditStatus", + "message" + ] + } + }, + "required": [ + "commitCount", + "recentCommit" + ] + } + }, + "required": [ + "projects", + "uris", + "metrics" + ] + } + }, + "required": [ + "id", + "type", + "phid", + "fields", + "attachments" + ] } }, "required": [ @@ -189,6 +1240,7 @@ "type", "phid", "fields", - "attachments" + "attachments", + "repository" ] } \ No newline at end of file diff --git a/sources/phabricator-source/src/index.ts b/sources/phabricator-source/src/index.ts index d9061c06d..5454cdb25 100644 --- a/sources/phabricator-source/src/index.ts +++ b/sources/phabricator-source/src/index.ts @@ -10,8 +10,7 @@ import { import VError from 'verror'; import {Phabricator, PhabricatorConfig} from './phabricator'; -import {Repositories} from './streams'; -import {Commits} from './streams/commits'; +import {Commits, Repositories} from './streams'; /** The main entry point. */ export function mainCommand(): Command { diff --git a/sources/phabricator-source/src/phabricator.ts b/sources/phabricator-source/src/phabricator.ts index b21225bf6..d22b4e6b6 100644 --- a/sources/phabricator-source/src/phabricator.ts +++ b/sources/phabricator-source/src/phabricator.ts @@ -2,8 +2,9 @@ import {Condoit} from 'condoit'; import iDiffusion from 'condoit/dist/interfaces/iDiffusion'; import {ErrorCodes, phid} from 'condoit/dist/interfaces/iGlobal'; import {AirbyteLogger} from 'faros-airbyte-cdk'; -import _ from 'lodash'; +import {trim, uniq} from 'lodash'; import moment, {Moment} from 'moment'; +import {Dictionary} from 'ts-essentials'; import {VError} from 'verror'; export const PHABRICATOR_DEFAULT_LIMIT = 100; @@ -30,55 +31,14 @@ interface PagedResult extends ErrorCodes { }; } -export interface Commit { - fields: { - identifier: string; - repositoryPHID: phid; - author: { - name: string; - email: string; - raw: string; - epoch: number; - identityPHID: phid; - userPHID: phid; - }; - // 'committer' is mispelled as 'commiter' in the original library - // so I had to copy the type here until fixed - - // https://github.com/securisec/condoit/issues/8 - committer: { - name: string; - email: string; - raw: string; - epoch: number; - identityPHID: phid; - userPHID: phid; - }; - isImported: boolean; - isUnreachable: boolean; - auditStatus: { - value: string; - name: string; - closed: boolean; - 'color.ansi': string; - }; - message: string; - policy: { - view: string; - edit: string; - }; - }; - attachments: { - subscribers: { - subscriberPHIDs: Array; - subscriberCount: number; - viewerIsSubscribed: boolean; - }; - projects: { - projectPHIDs: Array; - }; - }; +export interface Commit extends iDiffusion.retDiffusionCommitSearchData { + // Added full repository information as well + repository?: Repository; } export class Phabricator { + private static repoCacheById: Dictionary = {}; + private static repoCacheByName: Dictionary = {}; + constructor( readonly client: Condoit, readonly startDate: Moment, @@ -121,64 +81,102 @@ export class Phabricator { if (Array.isArray(s)) return s; return s .split(sep) - .map(_.trim) + .map((v) => trim(v)) .filter((v) => v.length > 0); } private async *paginate( limit: number, fetch: (after?: string) => Promise>, - process: (item: T) => R | undefined, + process: (data: T[]) => Promise, earlyTermination = true ): AsyncGenerator { let after: string = null; let res: PagedResult | undefined; do { res = await fetch(after); - after = res.result.cursor.after; - if (res.error_code || res.error_info) { - throw new VError(`${res.error_code}: ${res.error_info}`); + if (res?.error_code || res?.error_info) { + throw new VError(`${res?.error_code}: ${res?.error_info}`); } - res.result.data; let count = 0; - for (const data of res.result.data) { - const item = process(data); + const processed = await process(res?.result?.data ?? []); + for (const item of processed) { if (item) { count++; yield item; } } if (earlyTermination && count < limit) return; + after = res?.result?.cursor?.after; } while (after); } async *getRepositories( + filter: { + repoIds?: phid[]; + repoNames?: string[]; + }, createdAt?: number, limit = this.limit ): AsyncGenerator { const created = Math.max(createdAt ?? 0, this.startDate.unix()); this.logger.debug(`Fetching repositories created since ${created}`); + const attachments = {projects: false, uris: true, metrics: true}; + let constraints = {}; + + if (filter.repoIds?.length > 0 || filter.repoNames?.length > 0) { + const missing = []; + const cachedRepos = filter.repoIds + ? filter.repoIds.flatMap((id) => { + const res = Phabricator.repoCacheById[id]; + if (!res) missing.push(id); + return res ? [res] : []; + }) + : filter.repoNames.flatMap((name) => { + const res = Phabricator.repoCacheByName[name]; + if (!res) missing.push(name); + return res ? [res] : []; + }); + + this.logger.debug( + `Retrieved ${cachedRepos.length} repos from cache (${missing.length} missed)` + ); + // Return all the cached repositories + for (const repo of cachedRepos) { + yield repo; + } + // We got all the repos from the cache - nothing left to do + if (missing.length == 0) return; + // Fetch missing repos from from the API + constraints = filter.repoIds ? {phids: missing} : {shortNames: missing}; + } + yield* this.paginate( limit, - (after) => - this.client.diffusion.repositorySearch({ + (after) => { + return this.client.diffusion.repositorySearch({ queryKey: 'all', order: 'newest', - constraints: { - shortNames: this.repositories, - }, - attachments: { - projects: false, - uris: true, - metrics: true, - }, + constraints, + attachments, limit, after, - }), - (item) => { - if (item.fields.dateCreated >= created) return item; - return undefined; + }); + }, + async (repos) => { + const newRepos = repos.filter( + (repo) => repo.fields.dateCreated > created + ); + // Cache repositories for other queries + for (const repo of newRepos) { + this.logger.debug( + `Cached repo ${repo.fields.shortName} (${repo.phid})` + ); + Phabricator.repoCacheById[repo.phid] = repo; + Phabricator.repoCacheByName[repo.fields.shortName] = repo; + } + return newRepos; } ); } @@ -187,14 +185,20 @@ export class Phabricator { committedAt?: number, limit = this.limit ): AsyncGenerator { - const repositoryIds = []; + const committed = Math.max(committedAt ?? 0, this.startDate.unix()); + this.logger.debug(`Fetching commits committed since ${committedAt}`); + + // Only repository IDs work as constraint filter for commits, + // therefore we do an extra lookup here + const repositories = []; if (this.repositories.length > 0) { - for await (const repo of this.getRepositories()) { - repositoryIds.push(repo.phid); + const repos = this.getRepositories({repoNames: this.repositories}); + for await (const repo of repos) { + repositories.push(repo.phid); } } - const committed = Math.max(committedAt ?? 0, this.startDate.unix()); - this.logger.debug(`Fetching commits committed since ${committedAt}`); + const constraints = {repositories, unreachable: false}; + const attachments = {projects: false, subscribers: false}; yield* this.paginate( limit, @@ -202,22 +206,29 @@ export class Phabricator { this.client.diffusion.commitSearch({ queryKey: 'all', order: 'newest', - constraints: { - repositories: repositoryIds, - unreachable: false, - }, - attachments: { - projects: false, - subscribers: false, - }, + constraints, + attachments, limit, after, }), + async (commits) => { + const newCommits = commits + .map((commit) => commit as Commit) + .filter((commit) => commit.fields.committer.epoch > committed); - (item) => { - const commit = item as any as Commit; - if (commit.fields.committer.epoch >= committed) return commit; - return undefined; + // Extend commits with full repository information if present + const newCommitRepoIds = uniq( + newCommits.map((c) => c.fields.repositoryPHID) + ); + const newCommitRepos: Dictionary = {}; + const repos = this.getRepositories({repoIds: newCommitRepoIds}); + for await (const repo of repos) { + newCommitRepos[repo.phid] = repo; + } + return newCommits.map((commit) => { + commit.repository = newCommitRepos[commit.fields.repositoryPHID]; + return commit; + }); } ); } diff --git a/sources/phabricator-source/src/streams/commits.ts b/sources/phabricator-source/src/streams/commits.ts index 1321de613..60f8d17c3 100644 --- a/sources/phabricator-source/src/streams/commits.ts +++ b/sources/phabricator-source/src/streams/commits.ts @@ -52,6 +52,7 @@ export class Commits extends AirbyteStreamBase { const phabricator = await Phabricator.make(this.config, this.logger); const state = syncMode === SyncMode.INCREMENTAL ? streamState : undefined; const committedAt = state?.latestCommittedAt ?? 0; + yield* phabricator.getCommits(committedAt); } } diff --git a/sources/phabricator-source/src/streams/index.ts b/sources/phabricator-source/src/streams/index.ts index b7c928f34..95243bda8 100644 --- a/sources/phabricator-source/src/streams/index.ts +++ b/sources/phabricator-source/src/streams/index.ts @@ -1,3 +1,4 @@ +import {Commits} from './commits'; import {Repositories} from './repositories'; -export {Repositories}; +export {Commits, Repositories}; diff --git a/sources/phabricator-source/src/streams/repositories.ts b/sources/phabricator-source/src/streams/repositories.ts index 715b1d69c..8d775dbd9 100644 --- a/sources/phabricator-source/src/streams/repositories.ts +++ b/sources/phabricator-source/src/streams/repositories.ts @@ -49,6 +49,15 @@ export class Repositories extends AirbyteStreamBase { const phabricator = await Phabricator.make(this.config, this.logger); const state = syncMode === SyncMode.INCREMENTAL ? streamState : undefined; const createdAt = state?.latestCreatedAt ?? 0; - yield* phabricator.getRepositories(createdAt); + + if (phabricator.repositories.length > 0) { + this.logger.info( + `Fetching repositories: ${phabricator.repositories.join(',')}` + ); + } + yield* phabricator.getRepositories( + {repoNames: phabricator.repositories}, + createdAt + ); } }