Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] File upload api refactor #210865

Merged
merged 23 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0891998
[ML] File upload api refactor
jgowdyelastic Feb 12, 2025
72e2ff5
clean up
jgowdyelastic Feb 12, 2025
6053688
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 12, 2025
8e023eb
translations
jgowdyelastic Feb 12, 2025
c5dc8b1
Merge branch 'file-upload-api-refactor' of github.com:jgowdyelastic/k…
jgowdyelastic Feb 12, 2025
b029475
Merge remote-tracking branch 'origin/main' into file-upload-api-refactor
jgowdyelastic Feb 12, 2025
c9efba9
bumping api version
jgowdyelastic Feb 12, 2025
ef58e54
Merge remote-tracking branch 'origin/main' into file-upload-api-refactor
jgowdyelastic Feb 13, 2025
bfeafb0
fix for files with no pipeline
jgowdyelastic Feb 13, 2025
5e17ae7
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 17, 2025
44e117f
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 17, 2025
0a0105d
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 18, 2025
19e72c4
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 19, 2025
835d51b
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 19, 2025
d030a77
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 20, 2025
3b340af
import type
jgowdyelastic Feb 21, 2025
02c6fde
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 21, 2025
5e32334
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 24, 2025
db2d59d
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 24, 2025
c29536a
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 25, 2025
972e733
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 25, 2025
b379719
fixing types
jgowdyelastic Feb 25, 2025
068fa0f
Merge branch 'main' into file-upload-api-refactor
jgowdyelastic Feb 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export async function importData(props: Props, config: Config, setState: (state:

let settings = {};
let mappings = {};
let pipeline;
const pipelines = [];

try {
settings = JSON.parse(indexSettingsString);
Expand All @@ -108,7 +108,7 @@ export async function importData(props: Props, config: Config, setState: (state:

try {
if (createPipeline) {
pipeline = JSON.parse(pipelineString) as IngestPipeline;
pipelines.push(JSON.parse(pipelineString) as IngestPipeline);
}
} catch (error) {
success = false;
Expand Down Expand Up @@ -178,7 +178,12 @@ export async function importData(props: Props, config: Config, setState: (state:
return;
}

const initializeImportResp = await importer.initializeImport(index, settings, mappings, pipeline);
const initializeImportResp = await importer.initializeImport(
index,
settings,
mappings,
pipelines
);

if (initializeImportResp.success === false) {
errors.push(initializeImportResp.error);
Expand All @@ -198,16 +203,16 @@ export async function importData(props: Props, config: Config, setState: (state:
});

if (createPipeline) {
const pipelineCreated = initializeImportResp.pipelineId !== undefined;
const pipelinesCreated = initializeImportResp.pipelineIds.length > 0;
if (indexCreated) {
setState({
ingestPipelineCreatedStatus: pipelineCreated
ingestPipelineCreatedStatus: pipelinesCreated
? IMPORT_STATUS.COMPLETE
: IMPORT_STATUS.FAILED,
pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '',
pipelineId: pipelinesCreated ? initializeImportResp.pipelineIds[0] : '',
});
}
success = indexCreated && pipelineCreated;
success = indexCreated && pipelinesCreated;
} else {
success = indexCreated;
}
Expand All @@ -218,9 +223,8 @@ export async function importData(props: Props, config: Config, setState: (state:
}

const importResp = await importer.import(
initializeImportResp.id,
index,
pipelineId ?? initializeImportResp.pipelineId,
pipelineId ?? initializeImportResp.pipelineIds[0],
(progress: number) => {
setState({
uploadProgress: progress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import { switchMap, combineLatest, BehaviorSubject, of } from 'rxjs';
import type { HttpSetup } from '@kbn/core/public';
import type { IImporter } from '@kbn/file-upload-plugin/public/importer/types';
import type { DataViewsServicePublic } from '@kbn/data-views-plugin/public/types';
import type { ImportResponse, IngestPipeline } from '@kbn/file-upload-plugin/common/types';
import type {
IngestPipeline,
InitializeImportResponse,
} from '@kbn/file-upload-plugin/common/types';
import type {
IndicesIndexSettings,
MappingTypeMapping,
Expand Down Expand Up @@ -64,7 +67,7 @@ export class FileManager {
private mappingsCheckSubscription: Subscription;
private settings;
private mappings: MappingTypeMapping | null = null;
private pipelines: IngestPipeline[] | null = null;
private pipelines: Array<IngestPipeline | undefined> = [];
private inferenceId: string | null = null;
private importer: IImporter | null = null;
private timeFieldName: string | undefined | null = null;
Expand Down Expand Up @@ -221,7 +224,7 @@ export class FileManager {
return createMergedMappings(files);
}

private getPipelines(): IngestPipeline[] {
private getPipelines(): Array<IngestPipeline | undefined> {
const files = this.getFiles();
return files.map((file) => file.getPipeline());
}
Expand Down Expand Up @@ -256,29 +259,32 @@ export class FileManager {
});
}

const createPipelines = this.pipelines.length > 0;

this.setStatus({
indexCreated: STATUS.STARTED,
pipelineCreated: STATUS.STARTED,
pipelineCreated: createPipelines ? STATUS.STARTED : STATUS.NA,
});

let indexCreated = false;
let pipelineCreated = false;
let initializeImportResp: ImportResponse | undefined;
let pipelinesCreated = false;
let initializeImportResp: InitializeImportResponse | undefined;

try {
initializeImportResp = await this.importer.initializeImport(
indexName,
this.settings,
this.mappings,
this.pipelines[0],
this.pipelines
);
this.timeFieldName = this.importer.getTimeField();
indexCreated = initializeImportResp.index !== undefined;
pipelineCreated = initializeImportResp.pipelineId !== undefined;
pipelinesCreated = initializeImportResp.pipelineIds.length > 0;
this.setStatus({
indexCreated: indexCreated ? STATUS.COMPLETED : STATUS.FAILED,
pipelineCreated: pipelineCreated ? STATUS.COMPLETED : STATUS.FAILED,
...(createPipelines
? { pipelineCreated: pipelinesCreated ? STATUS.COMPLETED : STATUS.FAILED }
: {}),
});

if (initializeImportResp.error) {
Expand All @@ -299,7 +305,11 @@ export class FileManager {
return null;
}

if (!indexCreated || !pipelineCreated || !initializeImportResp) {
if (
indexCreated === false ||
(createPipelines && pipelinesCreated === false) ||
!initializeImportResp
) {
return null;
}

Expand All @@ -309,16 +319,12 @@ export class FileManager {

// import data
const files = this.getFiles();
const createdPipelineIds = initializeImportResp.pipelineIds;

try {
await Promise.all(
files.map(async (file, i) => {
await file.import(
initializeImportResp!.id,
indexName,
this.mappings!,
`${indexName}-${i}-pipeline`
);
await file.import(indexName, this.mappings!, createdPipelineIds[i] ?? undefined);
})
);
} catch (error) {
Expand All @@ -345,9 +351,7 @@ export class FileManager {
this.setStatus({
pipelinesDeleted: STATUS.STARTED,
});
await this.importer.deletePipelines(
this.pipelines.map((p, i) => `${indexName}-${i}-pipeline`)
);
await this.importer.deletePipelines();
this.setStatus({
pipelinesDeleted: STATUS.COMPLETED,
});
Expand Down Expand Up @@ -461,6 +465,9 @@ export class FileManager {
};

this.pipelines.forEach((pipeline) => {
if (pipeline === undefined) {
return;
}
pipeline.processors.push({
set: {
field: 'content',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,8 @@ export class FileWrapper {
public getMappings() {
return this.analyzedFile$.getValue().results?.mappings;
}
public getPipeline(): IngestPipeline {
return (
this.analyzedFile$.getValue().results?.ingest_pipeline ?? {
description: '',
processors: [],
}
);
public getPipeline(): IngestPipeline | undefined {
return this.analyzedFile$.getValue().results?.ingest_pipeline;
}
public getFormat() {
return this.analyzedFile$.getValue().results?.format;
Expand All @@ -184,23 +179,24 @@ export class FileWrapper {
return this.analyzedFile$.getValue().data;
}

public async import(id: string, index: string, mappings: MappingTypeMapping, pipelineId: string) {
public async import(index: string, mappings: MappingTypeMapping, pipelineId: string | undefined) {
this.setStatus({ importStatus: STATUS.STARTED });
const format = this.analyzedFile$.getValue().results!.format;
const importer = await this.fileUpload.importerFactory(format, {
excludeLinesPattern: this.analyzedFile$.getValue().results!.exclude_lines_pattern,
multilineStartPattern: this.analyzedFile$.getValue().results!.multiline_start_pattern,
});

importer.initializeWithoutCreate(index, mappings, this.getPipeline());
const ingestPipeline = this.getPipeline();
importer.initializeWithoutCreate(index, mappings, ingestPipeline ? [ingestPipeline] : []);
const data = this.getData();
if (data === null) {
this.setStatus({ importStatus: STATUS.FAILED });
return;
}
importer.read(data);
try {
const resp = await importer.import(id, index, pipelineId, (p) => {
const resp = await importer.import(index, pipelineId, (p) => {
this.setStatus({ importProgress: p });
});
this.setStatus({ docCount: resp.docCount, importStatus: STATUS.COMPLETED });
Expand Down
13 changes: 11 additions & 2 deletions x-pack/platform/plugins/private/file_upload/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,19 @@ export interface HasImportPermission {

export type InputData = any[];

export interface ImportResponse {
export interface InitializeImportResponse {
success: boolean;
id: string;
index?: string;
index: string;
pipelineIds: Array<string | undefined>;
error?: {
error: estypes.ErrorCause;
};
}

export interface ImportResponse {
success: boolean;
index: string;
pipelineId?: string;
docCount: number;
failures: ImportFailure[];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ export class GeoUploadWizard extends Component<FileUploadComponentProps, State>
},
},
};
const ingestPipeline = {
description: '',
processors: [],
};

this.setState({
importStatus: i18n.translate('xpack.fileUpload.geoUploadWizard.dataIndexingStarted', {
defaultMessage: 'Creating index: {indexName}',
Expand All @@ -125,7 +122,7 @@ export class GeoUploadWizard extends Component<FileUploadComponentProps, State>
this.state.indexName,
{},
mappings,
ingestPipeline
[]
);
if (!this._isMounted) {
return;
Expand All @@ -147,9 +144,8 @@ export class GeoUploadWizard extends Component<FileUploadComponentProps, State>
});
this._geoFileImporter.setSmallChunks(this.state.smallChunks);
const importResults = await this._geoFileImporter.import(
initializeImportResp.id,
this.state.indexName,
initializeImportResp.pipelineId,
initializeImportResp.pipelineIds[0],
(progress) => {
if (this._isMounted) {
this.setState({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import { i18n } from '@kbn/i18n';
import { ES_FIELD_TYPES } from '@kbn/data-plugin/public';
import { GeoFileImporter, GeoFilePreview } from './types';
import { CreateDocsResponse, ImportResults } from '../types';
import { callImportRoute, Importer, IMPORT_RETRIES, MAX_CHUNK_CHAR_COUNT } from '../importer';
import { Importer, IMPORT_RETRIES, MAX_CHUNK_CHAR_COUNT } from '../importer';
import { MB } from '../../../common/constants';
import type { ImportDoc, ImportFailure, ImportResponse } from '../../../common/types';
import { geoJsonCleanAndValidate } from './geojson_clean_and_validate';
import { createChunks } from './create_chunks';
import { callImportRoute } from '../routes';

const BLOCK_SIZE_MB = 5 * MB;

Expand Down Expand Up @@ -80,16 +81,15 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
}

public async import(
id: string,
index: string,
pipelineId: string | undefined,
pipelineId: string,
setImportProgress: (progress: number) => void
): Promise<ImportResults> {
if (!id || !index) {
if (!index) {
return {
success: false,
error: i18n.translate('xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage', {
defaultMessage: 'no ID or index supplied',
error: i18n.translate('xpack.fileUpload.import.noIndexSuppliedErrorMessage', {
defaultMessage: 'No index supplied',
}),
};
}
Expand Down Expand Up @@ -134,7 +134,6 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
this._blockSizeInBytes = 0;

importBlockPromise = this._importBlock(
id,
index,
pipelineId,
chunks,
Expand Down Expand Up @@ -167,9 +166,8 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
}

private async _importBlock(
id: string,
index: string,
pipelineId: string | undefined,
pipelineId: string,
chunks: ImportDoc[][],
blockSizeInBytes: number,
setImportProgress: (progress: number) => void
Expand All @@ -184,24 +182,15 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
success: false,
failures: [],
docCount: 0,
id: '',
index: '',
pipelineId: '',
};
while (resp.success === false && retries > 0) {
try {
resp = await callImportRoute({
id,
index,
ingestPipelineId: pipelineId,
data: chunks[i],
settings: {},
mappings: {},
ingestPipeline:
pipelineId !== undefined
? {
id: pipelineId,
}
: undefined,
});

if (!this._isActive) {
Expand Down
Loading