From 0891998ee9f32adaf46143b7b6998005d71b327e Mon Sep 17 00:00:00 2001 From: James Gowdy Date: Wed, 12 Feb 2025 15:27:28 +0000 Subject: [PATCH 1/7] [ML] File upload api refactor --- .../components/import_view/import.ts | 22 ++- .../public/lite/file_manager/file_manager.ts | 23 ++- .../public/lite/file_manager/file_wrapper.ts | 6 +- .../private/file_upload/common/types.ts | 15 +- .../public/components/geo_upload_wizard.tsx | 5 +- .../geo/abstract_geo_file_importer.tsx | 23 +-- .../file_upload/public/importer/importer.ts | 158 +++++----------- .../file_upload/public/importer/routes.ts | 67 +++++++ .../file_upload/public/importer/types.ts | 17 +- .../private/file_upload/server/import_data.ts | 93 +++++----- .../private/file_upload/server/routes.ts | 169 ++++++++++++++---- .../private/file_upload/server/schemas.ts | 25 ++- 12 files changed, 357 insertions(+), 266 deletions(-) create mode 100644 x-pack/platform/plugins/private/file_upload/public/importer/routes.ts diff --git a/x-pack/platform/plugins/private/data_visualizer/public/application/file_data_visualizer/components/import_view/import.ts b/x-pack/platform/plugins/private/data_visualizer/public/application/file_data_visualizer/components/import_view/import.ts index c8fb9458b7c75..4a52e01ce8939 100644 --- a/x-pack/platform/plugins/private/data_visualizer/public/application/file_data_visualizer/components/import_view/import.ts +++ b/x-pack/platform/plugins/private/data_visualizer/public/application/file_data_visualizer/components/import_view/import.ts @@ -84,7 +84,7 @@ export async function importData(props: Props, config: Config, setState: (state: let settings = {}; let mappings = {}; - let pipeline; + const pipelines = []; try { settings = JSON.parse(indexSettingsString); @@ -108,7 +108,7 @@ export async function importData(props: Props, config: Config, setState: (state: try { if (createPipeline) { - pipeline = JSON.parse(pipelineString) as IngestPipeline; + pipelines.push(JSON.parse(pipelineString) as IngestPipeline); } } catch (error) { success = false; @@ -178,7 +178,12 @@ export async function importData(props: Props, config: Config, setState: (state: return; } - const initializeImportResp = await importer.initializeImport(index, settings, mappings, pipeline); + const initializeImportResp = await importer.initializeImport( + index, + settings, + mappings, + pipelines + ); if (initializeImportResp.success === false) { errors.push(initializeImportResp.error); @@ -198,16 +203,16 @@ export async function importData(props: Props, config: Config, setState: (state: }); if (createPipeline) { - const pipelineCreated = initializeImportResp.pipelineId !== undefined; + const pipelinesCreated = !!initializeImportResp.pipelineIds?.length; if (indexCreated) { setState({ - ingestPipelineCreatedStatus: pipelineCreated + ingestPipelineCreatedStatus: pipelinesCreated ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED, - pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '', + pipelineId: pipelinesCreated ? initializeImportResp.pipelineIds[0] : '', }); } - success = indexCreated && pipelineCreated; + success = indexCreated && pipelinesCreated; } else { success = indexCreated; } @@ -218,9 +223,8 @@ export async function importData(props: Props, config: Config, setState: (state: } const importResp = await importer.import( - initializeImportResp.id, index, - pipelineId ?? initializeImportResp.pipelineId, + pipelineId ?? initializeImportResp.pipelineIds[0], (progress: number) => { setState({ uploadProgress: progress, diff --git a/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_manager.ts b/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_manager.ts index 0eccd4ad2d72f..c62384ade586d 100644 --- a/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_manager.ts +++ b/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_manager.ts @@ -13,7 +13,10 @@ import { switchMap, combineLatest, BehaviorSubject, of } from 'rxjs'; import type { HttpSetup } from '@kbn/core/public'; import type { IImporter } from '@kbn/file-upload-plugin/public/importer/types'; import type { DataViewsServicePublic } from '@kbn/data-views-plugin/public/types'; -import type { ImportResponse, IngestPipeline } from '@kbn/file-upload-plugin/common/types'; +import type { + IngestPipeline, + InitializeImportResponse, +} from '@kbn/file-upload-plugin/common/types'; import type { IndicesIndexSettings, MappingTypeMapping, @@ -262,23 +265,22 @@ export class FileManager { }); let indexCreated = false; - let pipelineCreated = false; - let initializeImportResp: ImportResponse | undefined; + let pipelinesCreated = false; + let initializeImportResp: InitializeImportResponse | undefined; try { initializeImportResp = await this.importer.initializeImport( indexName, this.settings, this.mappings, - this.pipelines[0], this.pipelines ); this.timeFieldName = this.importer.getTimeField(); indexCreated = initializeImportResp.index !== undefined; - pipelineCreated = initializeImportResp.pipelineId !== undefined; + pipelinesCreated = !!initializeImportResp.pipelineIds?.length; this.setStatus({ indexCreated: indexCreated ? STATUS.COMPLETED : STATUS.FAILED, - pipelineCreated: pipelineCreated ? STATUS.COMPLETED : STATUS.FAILED, + pipelineCreated: pipelinesCreated ? STATUS.COMPLETED : STATUS.FAILED, }); if (initializeImportResp.error) { @@ -299,7 +301,7 @@ export class FileManager { return null; } - if (!indexCreated || !pipelineCreated || !initializeImportResp) { + if (!indexCreated || !pipelinesCreated || !initializeImportResp) { return null; } @@ -313,12 +315,7 @@ export class FileManager { try { await Promise.all( files.map(async (file, i) => { - await file.import( - initializeImportResp!.id, - indexName, - this.mappings!, - `${indexName}-${i}-pipeline` - ); + await file.import(indexName, this.mappings!, `${indexName}-${i}-pipeline`); }) ); } catch (error) { diff --git a/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_wrapper.ts b/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_wrapper.ts index f1885dff92f74..3460798bdc2c4 100644 --- a/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_wrapper.ts +++ b/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_wrapper.ts @@ -184,7 +184,7 @@ export class FileWrapper { return this.analyzedFile$.getValue().data; } - public async import(id: string, index: string, mappings: MappingTypeMapping, pipelineId: string) { + public async import(index: string, mappings: MappingTypeMapping, pipelineId: string) { this.setStatus({ importStatus: STATUS.STARTED }); const format = this.analyzedFile$.getValue().results!.format; const importer = await this.fileUpload.importerFactory(format, { @@ -192,7 +192,7 @@ export class FileWrapper { multilineStartPattern: this.analyzedFile$.getValue().results!.multiline_start_pattern, }); - importer.initializeWithoutCreate(index, mappings, this.getPipeline()); + importer.initializeWithoutCreate(index, mappings, [this.getPipeline()]); const data = this.getData(); if (data === null) { this.setStatus({ importStatus: STATUS.FAILED }); @@ -200,7 +200,7 @@ export class FileWrapper { } importer.read(data); try { - const resp = await importer.import(id, index, pipelineId, (p) => { + const resp = await importer.import(index, pipelineId, (p) => { this.setStatus({ importProgress: p }); }); this.setStatus({ docCount: resp.docCount, importStatus: STATUS.COMPLETED }); diff --git a/x-pack/platform/plugins/private/file_upload/common/types.ts b/x-pack/platform/plugins/private/file_upload/common/types.ts index 409b5fcac80a1..9064c437b121c 100644 --- a/x-pack/platform/plugins/private/file_upload/common/types.ts +++ b/x-pack/platform/plugins/private/file_upload/common/types.ts @@ -98,11 +98,20 @@ export interface HasImportPermission { export type InputData = any[]; -export interface ImportResponse { +export interface InitializeImportResponse { success: boolean; id: string; - index?: string; - pipelineId?: string; + index: string; + pipelineIds: string[]; + error?: { + error: estypes.ErrorCause; + }; +} + +export interface ImportResponse { + success: boolean; + index: string; + pipelineId: string; docCount: number; failures: ImportFailure[]; error?: { diff --git a/x-pack/platform/plugins/private/file_upload/public/components/geo_upload_wizard.tsx b/x-pack/platform/plugins/private/file_upload/public/components/geo_upload_wizard.tsx index 643642958e2b5..62afd84117b06 100644 --- a/x-pack/platform/plugins/private/file_upload/public/components/geo_upload_wizard.tsx +++ b/x-pack/platform/plugins/private/file_upload/public/components/geo_upload_wizard.tsx @@ -125,7 +125,7 @@ export class GeoUploadWizard extends Component this.state.indexName, {}, mappings, - ingestPipeline + [ingestPipeline] ); if (!this._isMounted) { return; @@ -147,9 +147,8 @@ export class GeoUploadWizard extends Component }); this._geoFileImporter.setSmallChunks(this.state.smallChunks); const importResults = await this._geoFileImporter.import( - initializeImportResp.id, this.state.indexName, - initializeImportResp.pipelineId, + initializeImportResp.pipelineIds[0], (progress) => { if (this._isMounted) { this.setState({ diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/geo/abstract_geo_file_importer.tsx b/x-pack/platform/plugins/private/file_upload/public/importer/geo/abstract_geo_file_importer.tsx index c79228bee933b..03fb092d1504a 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/geo/abstract_geo_file_importer.tsx +++ b/x-pack/platform/plugins/private/file_upload/public/importer/geo/abstract_geo_file_importer.tsx @@ -11,11 +11,12 @@ import { i18n } from '@kbn/i18n'; import { ES_FIELD_TYPES } from '@kbn/data-plugin/public'; import { GeoFileImporter, GeoFilePreview } from './types'; import { CreateDocsResponse, ImportResults } from '../types'; -import { callImportRoute, Importer, IMPORT_RETRIES, MAX_CHUNK_CHAR_COUNT } from '../importer'; +import { Importer, IMPORT_RETRIES, MAX_CHUNK_CHAR_COUNT } from '../importer'; import { MB } from '../../../common/constants'; import type { ImportDoc, ImportFailure, ImportResponse } from '../../../common/types'; import { geoJsonCleanAndValidate } from './geojson_clean_and_validate'; import { createChunks } from './create_chunks'; +import { callImportRoute } from '../routes'; const BLOCK_SIZE_MB = 5 * MB; @@ -80,12 +81,11 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter } public async import( - id: string, index: string, - pipelineId: string | undefined, + pipelineId: string, setImportProgress: (progress: number) => void ): Promise { - if (!id || !index) { + if (!index) { return { success: false, error: i18n.translate('xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage', { @@ -134,7 +134,6 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter this._blockSizeInBytes = 0; importBlockPromise = this._importBlock( - id, index, pipelineId, chunks, @@ -167,9 +166,8 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter } private async _importBlock( - id: string, index: string, - pipelineId: string | undefined, + pipelineId: string, chunks: ImportDoc[][], blockSizeInBytes: number, setImportProgress: (progress: number) => void @@ -184,24 +182,15 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter success: false, failures: [], docCount: 0, - id: '', index: '', pipelineId: '', }; while (resp.success === false && retries > 0) { try { resp = await callImportRoute({ - id, index, + ingestPipelineId: pipelineId, data: chunks[i], - settings: {}, - mappings: {}, - ingestPipeline: - pipelineId !== undefined - ? { - id: pipelineId, - } - : undefined, }); if (!this._isActive) { diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts b/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts index bbb6f86d56be8..34c1fcc721794 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts @@ -24,6 +24,7 @@ import type { IngestPipelineWrapper, } from '../../common/types'; import { CreateDocsResponse, IImporter, ImportResults } from './types'; +import { callImportRoute, callInitializeImportRoute } from './routes'; const CHUNK_SIZE = 5000; const REDUCED_CHUNK_SIZE = 100; @@ -36,7 +37,7 @@ export abstract class Importer implements IImporter { protected _docArray: ImportDoc[] = []; protected _chunkSize = CHUNK_SIZE; private _index: string | undefined; - private _pipeline: IngestPipeline | undefined; + private _pipelines: IngestPipelineWrapper[] = []; private _timeFieldName: string | undefined; private _initialized = false; @@ -82,42 +83,27 @@ export abstract class Importer implements IImporter { protected abstract _createDocs(t: string, isLastPart: boolean): CreateDocsResponse; - public async initializeImport( - index: string, - settings: IndicesIndexSettings, - mappings: MappingTypeMapping, - pipeline: IngestPipeline | undefined, - createPipelines?: IngestPipeline[] - ) { - let ingestPipelineWrapper: IngestPipelineWrapper | undefined; - if (pipeline !== undefined) { - updatePipelineTimezone(pipeline); - - if (pipelineContainsSpecialProcessors(pipeline)) { - // pipeline contains processors which we know are slow - // so reduce the chunk size significantly to avoid timeouts - this._chunkSize = REDUCED_CHUNK_SIZE; - } - // if no pipeline has been supplied, - // send an empty object - ingestPipelineWrapper = { - id: `${index}-pipeline`, - pipeline, - }; - } + private _initialize(index: string, mappings: MappingTypeMapping, pipelines: IngestPipeline[]) { + if (pipelines !== undefined) { + for (let i = 0; i < pipelines.length; i++) { + const pipeline = pipelines[i]; + updatePipelineTimezone(pipeline); - let createPipelinesWrappers: IngestPipelineWrapper[] | undefined; - if (createPipelines) { - createPipelinesWrappers = createPipelines.map((p, i) => { - return { + if (pipelineContainsSpecialProcessors(pipeline)) { + // pipeline contains processors which we know are slow + // so reduce the chunk size significantly to avoid timeouts + this._chunkSize = REDUCED_CHUNK_SIZE; + } + // if no pipeline has been supplied, + // send an empty object + this._pipelines.push({ id: `${index}-${i}-pipeline`, - pipeline: p, - }; - }); + pipeline, + }); + } } this._index = index; - this._pipeline = pipeline; // if an @timestamp field has been added to the // mappings, use this field as the time field. @@ -128,68 +114,48 @@ export abstract class Importer implements IImporter { : undefined; this._initialized = true; + } - return await callImportRoute({ - id: undefined, + public async initializeImport( + index: string, + settings: IndicesIndexSettings, + mappings: MappingTypeMapping, + pipelines: IngestPipeline[] + ) { + this._initialize(index, mappings, pipelines); + + return await callInitializeImportRoute({ index, - data: [], settings, mappings, - ingestPipeline: ingestPipelineWrapper, - createPipelines: createPipelinesWrappers, + ingestPipelines: this._pipelines, }); } public async initializeWithoutCreate( index: string, mappings: MappingTypeMapping, - pipeline: IngestPipeline | undefined + pipelines: IngestPipeline[] ) { - if (pipeline !== undefined) { - if (pipelineContainsSpecialProcessors(pipeline)) { - // pipeline contains processors which we know are slow - // so reduce the chunk size significantly to avoid timeouts - this._chunkSize = REDUCED_CHUNK_SIZE; - } - } - - this._index = index; - this._pipeline = pipeline; - - // if an @timestamp field has been added to the - // mappings, use this field as the time field. - // This relies on the field being populated by - // the ingest pipeline on ingest - this._timeFieldName = isPopulatedObject(mappings.properties, [DEFAULT_TIME_FIELD]) - ? DEFAULT_TIME_FIELD - : undefined; - - this._initialized = true; + this._initialize(index, mappings, pipelines); } public async import( - id: string, index: string, - pipelineId: string | undefined, + ingestPipelineId: string, setImportProgress: (progress: number) => void ): Promise { - if (!id || !index) { + if (!index) { return { success: false, error: i18n.translate('xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage', { - defaultMessage: 'no ID or index supplied', + defaultMessage: 'No index supplied', }), }; } const chunks = createDocumentChunks(this._docArray, this._chunkSize); - const ingestPipeline: IngestPipelineWrapper | undefined = pipelineId - ? { - id: pipelineId, - } - : undefined; - let success = true; const failures: ImportFailure[] = []; let error; @@ -200,7 +166,6 @@ export abstract class Importer implements IImporter { success: false, failures: [], docCount: 0, - id: '', index: '', pipelineId: '', }; @@ -208,12 +173,9 @@ export abstract class Importer implements IImporter { while (resp.success === false && retries > 0) { try { resp = await callImportRoute({ - id, index, + ingestPipelineId, data: chunks[i], - settings: {}, - mappings: {}, - ingestPipeline, }); if (retries < IMPORT_RETRIES) { @@ -269,7 +231,8 @@ export abstract class Importer implements IImporter { } public async previewIndexTimeRange() { - if (this._initialized === false || this._pipeline === undefined) { + const ingestPipeline = this._pipelines[0]; + if (this._initialized === false || ingestPipeline?.pipeline === undefined) { throw new Error('Import has not been initialized'); } @@ -280,7 +243,7 @@ export abstract class Importer implements IImporter { const body = JSON.stringify({ docs: firstDocs.concat(lastDocs), - pipeline: this._pipeline, + pipeline: ingestPipeline.pipeline, timeField: this._timeFieldName, }); return await getHttp().fetch<{ start: number | null; end: number | null }>({ @@ -291,14 +254,9 @@ export abstract class Importer implements IImporter { }); } - public async deletePipelines(pipelineIds: string[]) { - // remove_pipelines - // const body = JSON.stringify({ - // pipelineIds, - // }); - + public async deletePipelines() { return await getHttp().fetch({ - path: `/internal/file_upload/remove_pipelines/${pipelineIds.join(',')}`, + path: `/internal/file_upload/remove_pipelines/${this._pipelines.map((p) => p.id).join(',')}`, method: 'DELETE', version: '1', }); @@ -391,39 +349,3 @@ function pipelineContainsSpecialProcessors(pipeline: IngestPipeline) { const specialProcessors = ['inference', 'enrich']; return intersection(specialProcessors, keys).length !== 0; } - -export function callImportRoute({ - id, - index, - data, - settings, - mappings, - ingestPipeline, - createPipelines, -}: { - id: string | undefined; - index: string; - data: ImportDoc[]; - settings: IndicesIndexSettings; - mappings: MappingTypeMapping; - ingestPipeline: IngestPipelineWrapper | undefined; - createPipelines?: IngestPipelineWrapper[]; -}) { - const query = id !== undefined ? { id } : {}; - const body = JSON.stringify({ - index, - data, - settings, - mappings, - ingestPipeline, - createPipelines, - }); - - return getHttp().fetch({ - path: `/internal/file_upload/import`, - method: 'POST', - version: '1', - query, - body, - }); -} diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts b/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts new file mode 100644 index 0000000000000..fb2d4307c65be --- /dev/null +++ b/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + IndicesIndexSettings, + MappingTypeMapping, +} from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { + IngestPipelineWrapper, + InitializeImportResponse, + ImportDoc, + ImportResponse, +} from '../../common/types'; +import { getHttp } from '../kibana_services'; + +interface CallInitializeImportRoute { + index: string; + settings: IndicesIndexSettings; + mappings: MappingTypeMapping; + ingestPipelines?: IngestPipelineWrapper[]; +} + +interface CallImportRoute { + index: string; + ingestPipelineId: string; + data: ImportDoc[]; +} + +export function callInitializeImportRoute({ + index, + settings, + mappings, + ingestPipelines, +}: CallInitializeImportRoute) { + const body = JSON.stringify({ + index, + settings, + mappings, + ingestPipelines, + }); + + return getHttp().fetch({ + path: `/internal/file_upload/initialize_import`, + method: 'POST', + version: '1', + body, + }); +} + +export function callImportRoute({ index, data, ingestPipelineId }: CallImportRoute) { + const body = JSON.stringify({ + index, + ingestPipelineId, + data, + }); + + return getHttp().fetch({ + path: `/internal/file_upload/import`, + method: 'POST', + version: '1', + body, + }); +} diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/types.ts b/x-pack/platform/plugins/private/file_upload/public/importer/types.ts index 1694b1b2d5de1..bcf0a073e663e 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/types.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/types.ts @@ -11,7 +11,12 @@ import type { MappingTypeMapping, } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; -import type { ImportFailure, IngestPipeline, ImportDoc, ImportResponse } from '../../common/types'; +import type { + ImportFailure, + IngestPipeline, + ImportDoc, + InitializeImportResponse, +} from '../../common/types'; export interface ImportConfig { settings: IndicesIndexSettings; @@ -44,18 +49,16 @@ export interface IImporter { index: string, settings: IndicesIndexSettings, mappings: MappingTypeMapping, - pipeline: IngestPipeline | undefined, - createPipelines?: IngestPipeline[] - ): Promise; + pipeline: IngestPipeline[] + ): Promise; initializeWithoutCreate( index: string, mappings: MappingTypeMapping, - pipeline: IngestPipeline | undefined + pipelines: IngestPipeline[] ): void; import( - id: string, index: string, - pipelineId: string | undefined, + ingestPipelineId: string, setImportProgress: (progress: number) => void ): Promise; initialized(): boolean; diff --git a/x-pack/platform/plugins/private/file_upload/server/import_data.ts b/x-pack/platform/plugins/private/file_upload/server/import_data.ts index 605adecb376da..fba3858dd83a8 100644 --- a/x-pack/platform/plugins/private/file_upload/server/import_data.ts +++ b/x-pack/platform/plugins/private/file_upload/server/import_data.ts @@ -13,56 +13,68 @@ import type { MappingTypeMapping, } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { INDEX_META_DATA_CREATED_BY } from '../common/constants'; -import { ImportResponse, ImportFailure, InputData, IngestPipelineWrapper } from '../common/types'; +import { + ImportResponse, + ImportFailure, + InputData, + IngestPipelineWrapper, + InitializeImportResponse, +} from '../common/types'; export function importDataProvider({ asCurrentUser }: IScopedClusterClient) { - async function importData( - id: string | undefined, + async function initializeImport( index: string, settings: IndicesIndexSettings, mappings: MappingTypeMapping, - ingestPipeline: IngestPipelineWrapper | undefined, - createPipelines: IngestPipelineWrapper[], - data: InputData - ): Promise { + ingestPipelines: IngestPipelineWrapper[] + ): Promise { let createdIndex; - let createdPipelineId; - const docCount = data.length; - + const createdPipelineIds: string[] = []; + const id = generateId(); try { - const pipelineId = ingestPipeline?.id; - const pipeline = ingestPipeline?.pipeline; - - if (id === undefined) { - // first chunk of data, create the index and id to return - id = generateId(); - - await createIndex(index, settings, mappings); - createdIndex = index; - - // create the pipeline if one has been supplied - if (createPipelines !== undefined) { - for (const p of createPipelines) { - const resp = await createPipeline(p.id, p.pipeline); - if (resp.acknowledged !== true) { - throw resp; - } - } - } else if (pipelineId !== undefined) { - const resp = await createPipeline(pipelineId, pipeline); + await createIndex(index, settings, mappings); + createdIndex = index; + + // create the pipeline if one has been supplied + if (ingestPipelines !== undefined) { + for (const p of ingestPipelines) { + const resp = await createPipeline(p.id, p.pipeline); + createdPipelineIds.push(p.id); if (resp.acknowledged !== true) { throw resp; } } - createdPipelineId = pipelineId; - } else { - createdIndex = index; - createdPipelineId = pipelineId; } + return { + success: true, + id, + index: createdIndex, + pipelineIds: createdPipelineIds, + }; + } catch (error) { + return { + success: false, + id: id!, + index: createdIndex ?? '', + pipelineIds: createdPipelineIds, + error: error.body !== undefined ? error.body : error, + }; + } + } + + async function importData( + index: string, + ingestPipelineId: string, + data: InputData + ): Promise { + const docCount = data.length; + const pipelineId = ingestPipelineId; + + try { let failures: ImportFailure[] = []; if (data.length) { - const resp = await indexData(index, createdPipelineId, data); + const resp = await indexData(index, pipelineId, data); if (resp.success === false) { if (resp.ingestError) { // all docs failed, abort @@ -77,18 +89,16 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) { return { success: true, - id, - index: createdIndex, - pipelineId: createdPipelineId, + index, + pipelineId, docCount, failures, }; } catch (error) { return { success: false, - id: id!, - index: createdIndex, - pipelineId: createdPipelineId, + index, + pipelineId, error: error.body !== undefined ? error.body : error, docCount, ingestError: error.ingestError, @@ -187,6 +197,7 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) { } return { + initializeImport, importData, }; } diff --git a/x-pack/platform/plugins/private/file_upload/server/routes.ts b/x-pack/platform/plugins/private/file_upload/server/routes.ts index f7ef6e15b0c91..5986b82260dd6 100644 --- a/x-pack/platform/plugins/private/file_upload/server/routes.ts +++ b/x-pack/platform/plugins/private/file_upload/server/routes.ts @@ -6,14 +6,8 @@ */ import { schema } from '@kbn/config-schema'; -import type { IScopedClusterClient } from '@kbn/core/server'; import type { CoreSetup, Logger } from '@kbn/core/server'; -import type { - IndicesIndexSettings, - MappingTypeMapping, -} from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { MAX_FILE_SIZE_BYTES, MAX_TIKA_FILE_SIZE_BYTES } from '../common/constants'; -import type { IngestPipelineWrapper, InputData } from '../common/types'; import { wrapError } from './error_wrapper'; import { importDataProvider } from './import_data'; import { getTimeFieldRange } from './get_time_field_range'; @@ -22,28 +16,15 @@ import { analyzeFile } from './analyze_file'; import { updateTelemetry } from './telemetry'; import { importFileBodySchema, - importFileQuerySchema, analyzeFileQuerySchema, runtimeMappingsSchema, + initializeImportFileBodySchema, } from './schemas'; import type { StartDeps } from './types'; import { checkFileUploadPrivileges } from './check_privileges'; import { previewIndexTimeRange } from './preview_index_time_range'; import { previewTikaContents } from './preview_tika_contents'; - -function importData( - client: IScopedClusterClient, - id: string | undefined, - index: string, - settings: IndicesIndexSettings, - mappings: MappingTypeMapping, - ingestPipeline: IngestPipelineWrapper, - createPipelines: IngestPipelineWrapper[], - data: InputData -) { - const { importData: importDataFunc } = importDataProvider(client); - return importDataFunc(id, index, settings, mappings, ingestPipeline, createPipelines, data); -} +import { IngestPipelineWrapper } from '../common/types'; /** * Routes for the file upload. @@ -143,6 +124,59 @@ export function fileUploadRoutes(coreSetup: CoreSetup, logge } ); + /** + * @apiGroup FileDataVisualizer + * + * @api {post} /internal/file_upload/initialize_import Initialize import file process + * @apiName InitializeImportFile + * @apiDescription Creates an index and ingest pipelines for importing file data. + * + * @apiSchema (body) initializeImportFileBodySchema + */ + router.versioned + .post({ + path: '/internal/file_upload/initialize_import', + access: 'internal', + options: { + body: { + accepts: ['application/json'], + maxBytes: MAX_FILE_SIZE_BYTES, + }, + }, + }) + .addVersion( + { + version: '1', + security: { + authz: { + enabled: false, + reason: + 'This route is opted out from authorization because permissions will be checked by elasticsearch', + }, + }, + validate: { + request: { + body: initializeImportFileBodySchema, + }, + }, + }, + async (context, request, response) => { + try { + const { index, settings, mappings, ingestPipelines } = request.body; + const esClient = (await context.core).elasticsearch.client; + + await updateTelemetry(); + + const { initializeImport } = importDataProvider(esClient); + const result = await initializeImport(index, settings, mappings, ingestPipelines); + + return response.ok({ body: result }); + } catch (e) { + return response.customError(wrapError(e)); + } + } + ); + /** * @apiGroup FileDataVisualizer * @@ -150,7 +184,6 @@ export function fileUploadRoutes(coreSetup: CoreSetup, logge * @apiName ImportFile * @apiDescription Imports file data into elasticsearch index. * - * @apiSchema (query) importFileQuerySchema * @apiSchema (body) importFileBodySchema */ router.versioned @@ -176,8 +209,33 @@ export function fileUploadRoutes(coreSetup: CoreSetup, logge }, validate: { request: { - query: importFileQuerySchema, - body: importFileBodySchema, + query: schema.object({ + id: schema.maybe(schema.string()), + }), + body: schema.object({ + index: schema.string(), + data: schema.arrayOf(schema.any()), + settings: schema.maybe(schema.any()), + /** Mappings */ + mappings: schema.any(), + /** Ingest pipeline definition */ + ingestPipeline: schema.maybe( + schema.object({ + id: schema.maybe(schema.string()), + pipeline: schema.maybe(schema.any()), + }) + ), + createPipelines: schema.maybe( + schema.arrayOf( + schema.maybe( + schema.object({ + id: schema.maybe(schema.string()), + pipeline: schema.maybe(schema.any()), + }) + ) + ) + ), + }), }, }, }, @@ -187,24 +245,57 @@ export function fileUploadRoutes(coreSetup: CoreSetup, logge const { index, data, settings, mappings, ingestPipeline, createPipelines } = request.body; const esClient = (await context.core).elasticsearch.client; - // `id` being `undefined` tells us that this is a new import due to create a new index. - // follow-up import calls to just add additional data will include the `id` of the created - // index, we'll ignore those and don't increment the counter. + const { initializeImport, importData } = importDataProvider(esClient); + if (id === undefined) { - await updateTelemetry(); + const pipelines = [ + ...(ingestPipeline ? [ingestPipeline] : []), + ...(createPipelines ?? []), + ] as IngestPipelineWrapper[]; + + const result = await initializeImport(index, settings, mappings, pipelines); + // format the response to match v1 response + const body = { + id: 'tempId', + index: result.index, + pipelineId: result.pipelineIds[0], + success: result.success, + }; + return response.ok({ body }); } - const result = await importData( - esClient, - id, - index, - settings, - mappings, - // @ts-expect-error - ingestPipeline, - createPipelines, - data - ); + const result = await importData(index, ingestPipeline?.id ?? '', data); + + return response.ok({ body: result }); + } catch (e) { + return response.customError(wrapError(e)); + } + } + ) + .addVersion( + { + version: '2', + security: { + authz: { + enabled: false, + reason: + 'This route is opted out from authorization because permissions will be checked by elasticsearch', + }, + }, + validate: { + request: { + body: importFileBodySchema, + }, + }, + }, + async (context, request, response) => { + try { + const { index, data, ingestPipelineId } = request.body; + const esClient = (await context.core).elasticsearch.client; + + const { importData } = importDataProvider(esClient); + const result = await importData(index, ingestPipelineId, data); + return response.ok({ body: result }); } catch (e) { return response.customError(wrapError(e)); diff --git a/x-pack/platform/plugins/private/file_upload/server/schemas.ts b/x-pack/platform/plugins/private/file_upload/server/schemas.ts index f59a656576b94..fb09272338853 100644 --- a/x-pack/platform/plugins/private/file_upload/server/schemas.ts +++ b/x-pack/platform/plugins/private/file_upload/server/schemas.ts @@ -26,26 +26,25 @@ export const analyzeFileQuerySchema = schema.object({ timestamp_format: schema.maybe(schema.string()), }); -export const importFileQuerySchema = schema.object({ - id: schema.maybe(schema.string()), +const ingestPipeline = schema.object({ + id: schema.string(), + pipeline: schema.any(), }); -const ingestPipeline = schema.maybe( - schema.object({ - id: schema.maybe(schema.string()), - pipeline: schema.maybe(schema.any()), - }) -); - -export const importFileBodySchema = schema.object({ +export const initializeImportFileBodySchema = schema.object({ index: schema.string(), - data: schema.arrayOf(schema.any()), + /* Index settings */ settings: schema.maybe(schema.any()), /** Mappings */ mappings: schema.any(), /** Ingest pipeline definition */ - ingestPipeline, - createPipelines: schema.maybe(schema.arrayOf(ingestPipeline)), + ingestPipelines: schema.arrayOf(ingestPipeline), +}); + +export const importFileBodySchema = schema.object({ + index: schema.string(), + data: schema.arrayOf(schema.any()), + ingestPipelineId: schema.string(), }); export const runtimeMappingsSchema = schema.object( From 72e2ff5950c616a22d4667d139f36cf8209af58a Mon Sep 17 00:00:00 2001 From: James Gowdy Date: Wed, 12 Feb 2025 16:14:24 +0000 Subject: [PATCH 2/7] clean up --- .../components/import_view/import.ts | 2 +- .../file_upload/public/importer/importer.ts | 30 +++++++++---------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/x-pack/platform/plugins/private/data_visualizer/public/application/file_data_visualizer/components/import_view/import.ts b/x-pack/platform/plugins/private/data_visualizer/public/application/file_data_visualizer/components/import_view/import.ts index 4a52e01ce8939..780e87cbf7fef 100644 --- a/x-pack/platform/plugins/private/data_visualizer/public/application/file_data_visualizer/components/import_view/import.ts +++ b/x-pack/platform/plugins/private/data_visualizer/public/application/file_data_visualizer/components/import_view/import.ts @@ -203,7 +203,7 @@ export async function importData(props: Props, config: Config, setState: (state: }); if (createPipeline) { - const pipelinesCreated = !!initializeImportResp.pipelineIds?.length; + const pipelinesCreated = initializeImportResp.pipelineIds.length > 0; if (indexCreated) { setState({ ingestPipelineCreatedStatus: pipelinesCreated diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts b/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts index 34c1fcc721794..01bad9d0bf31b 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts @@ -84,23 +84,21 @@ export abstract class Importer implements IImporter { protected abstract _createDocs(t: string, isLastPart: boolean): CreateDocsResponse; private _initialize(index: string, mappings: MappingTypeMapping, pipelines: IngestPipeline[]) { - if (pipelines !== undefined) { - for (let i = 0; i < pipelines.length; i++) { - const pipeline = pipelines[i]; - updatePipelineTimezone(pipeline); - - if (pipelineContainsSpecialProcessors(pipeline)) { - // pipeline contains processors which we know are slow - // so reduce the chunk size significantly to avoid timeouts - this._chunkSize = REDUCED_CHUNK_SIZE; - } - // if no pipeline has been supplied, - // send an empty object - this._pipelines.push({ - id: `${index}-${i}-pipeline`, - pipeline, - }); + for (let i = 0; i < pipelines.length; i++) { + const pipeline = pipelines[i]; + updatePipelineTimezone(pipeline); + + if (pipelineContainsSpecialProcessors(pipeline)) { + // pipeline contains processors which we know are slow + // so reduce the chunk size significantly to avoid timeouts + this._chunkSize = REDUCED_CHUNK_SIZE; } + // if no pipeline has been supplied, + // send an empty object + this._pipelines.push({ + id: `${index}-${i}-pipeline`, + pipeline, + }); } this._index = index; From 8e023eb9552751f8bd4692c785e2c6796a998d88 Mon Sep 17 00:00:00 2001 From: James Gowdy Date: Wed, 12 Feb 2025 16:51:24 +0000 Subject: [PATCH 3/7] translations --- .../public/importer/geo/abstract_geo_file_importer.tsx | 4 ++-- .../plugins/private/file_upload/public/importer/importer.ts | 2 +- .../plugins/private/translations/translations/fr-FR.json | 1 - .../plugins/private/translations/translations/ja-JP.json | 1 - .../plugins/private/translations/translations/zh-CN.json | 1 - 5 files changed, 3 insertions(+), 6 deletions(-) diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/geo/abstract_geo_file_importer.tsx b/x-pack/platform/plugins/private/file_upload/public/importer/geo/abstract_geo_file_importer.tsx index 03fb092d1504a..80a0e51d0bcd3 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/geo/abstract_geo_file_importer.tsx +++ b/x-pack/platform/plugins/private/file_upload/public/importer/geo/abstract_geo_file_importer.tsx @@ -88,8 +88,8 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter if (!index) { return { success: false, - error: i18n.translate('xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage', { - defaultMessage: 'no ID or index supplied', + error: i18n.translate('xpack.fileUpload.import.noIndexSuppliedErrorMessage', { + defaultMessage: 'No index supplied', }), }; } diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts b/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts index 01bad9d0bf31b..5ab0ace637bab 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts @@ -146,7 +146,7 @@ export abstract class Importer implements IImporter { if (!index) { return { success: false, - error: i18n.translate('xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage', { + error: i18n.translate('xpack.fileUpload.import.noIndexSuppliedErrorMessage', { defaultMessage: 'No index supplied', }), }; diff --git a/x-pack/platform/plugins/private/translations/translations/fr-FR.json b/x-pack/platform/plugins/private/translations/translations/fr-FR.json index 51cbead1f0558..dd5910ef94553 100644 --- a/x-pack/platform/plugins/private/translations/translations/fr-FR.json +++ b/x-pack/platform/plugins/private/translations/translations/fr-FR.json @@ -17476,7 +17476,6 @@ "xpack.fileUpload.geoUploadWizard.outOfTotalMsg": "sur {totalFeaturesCount}", "xpack.fileUpload.geoUploadWizard.partialImportMsg": "Impossible d'indexer {failedFeaturesCount} {outOfTotalMsg} fonctionnalités.", "xpack.fileUpload.geoUploadWizard.writingToIndex": "Écriture dans l'index : {progress} % terminé", - "xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage": "aucun ID ni index fournis", "xpack.fileUpload.importComplete.copyButtonAriaLabel": "Copier dans le presse-papiers", "xpack.fileUpload.importComplete.dataViewResponse": "Réponse de la vue de données", "xpack.fileUpload.importComplete.indexingResponse": "Importer la réponse", diff --git a/x-pack/platform/plugins/private/translations/translations/ja-JP.json b/x-pack/platform/plugins/private/translations/translations/ja-JP.json index 755d1730a1647..18a9f6f69907e 100644 --- a/x-pack/platform/plugins/private/translations/translations/ja-JP.json +++ b/x-pack/platform/plugins/private/translations/translations/ja-JP.json @@ -17335,7 +17335,6 @@ "xpack.fileUpload.geoUploadWizard.outOfTotalMsg": "{totalFeaturesCount}件中", "xpack.fileUpload.geoUploadWizard.partialImportMsg": "{failedFeaturesCount} {outOfTotalMsg}個の特徴量をインデックス化できません", "xpack.fileUpload.geoUploadWizard.writingToIndex": "インデックスに書き込み中:{progress}%完了", - "xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage": "ID またはインデックスが提供されていません", "xpack.fileUpload.importComplete.copyButtonAriaLabel": "クリップボードにコピー", "xpack.fileUpload.importComplete.dataViewResponse": "データビュー応答", "xpack.fileUpload.importComplete.indexingResponse": "応答をインポート", diff --git a/x-pack/platform/plugins/private/translations/translations/zh-CN.json b/x-pack/platform/plugins/private/translations/translations/zh-CN.json index b4549821e6d10..75aa793605084 100644 --- a/x-pack/platform/plugins/private/translations/translations/zh-CN.json +++ b/x-pack/platform/plugins/private/translations/translations/zh-CN.json @@ -17059,7 +17059,6 @@ "xpack.fileUpload.geoUploadWizard.outOfTotalMsg": ",共 {totalFeaturesCount} 个", "xpack.fileUpload.geoUploadWizard.partialImportMsg": "无法索引 {failedFeaturesCount} 个 {outOfTotalMsg} 特征。", "xpack.fileUpload.geoUploadWizard.writingToIndex": "正在写入索引:已完成 {progress}%", - "xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage": "未提供任何 ID 或索引", "xpack.fileUpload.importComplete.copyButtonAriaLabel": "复制到剪贴板", "xpack.fileUpload.importComplete.dataViewResponse": "数据视图响应", "xpack.fileUpload.importComplete.indexingResponse": "导入响应", From c9efba98ff33a962fdaf6310fb92af9e6deafd8a Mon Sep 17 00:00:00 2001 From: James Gowdy Date: Wed, 12 Feb 2025 19:19:05 +0000 Subject: [PATCH 4/7] bumping api version --- .../plugins/private/file_upload/public/importer/routes.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts b/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts index fb2d4307c65be..88b22e801272c 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts @@ -61,7 +61,7 @@ export function callImportRoute({ index, data, ingestPipelineId }: CallImportRou return getHttp().fetch({ path: `/internal/file_upload/import`, method: 'POST', - version: '1', + version: '2', body, }); } From bfeafb0dc39d06004866e17f1012a6c6ca7e17c8 Mon Sep 17 00:00:00 2001 From: James Gowdy Date: Thu, 13 Feb 2025 16:16:54 +0000 Subject: [PATCH 5/7] fix for files with no pipeline --- .../public/lite/file_manager/file_manager.ts | 30 ++++++++++++------ .../public/lite/file_manager/file_wrapper.ts | 14 +++------ .../private/file_upload/common/types.ts | 4 +-- .../public/components/geo_upload_wizard.tsx | 7 ++--- .../file_upload/public/importer/importer.ts | 31 +++++++++++++------ .../file_upload/public/importer/types.ts | 6 ++-- .../private/file_upload/server/import_data.ts | 8 +++-- .../private/file_upload/server/schemas.ts | 4 +-- 8 files changed, 61 insertions(+), 43 deletions(-) diff --git a/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_manager.ts b/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_manager.ts index c62384ade586d..1761685473321 100644 --- a/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_manager.ts +++ b/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_manager.ts @@ -67,7 +67,7 @@ export class FileManager { private mappingsCheckSubscription: Subscription; private settings; private mappings: MappingTypeMapping | null = null; - private pipelines: IngestPipeline[] | null = null; + private pipelines: Array = []; private inferenceId: string | null = null; private importer: IImporter | null = null; private timeFieldName: string | undefined | null = null; @@ -224,7 +224,7 @@ export class FileManager { return createMergedMappings(files); } - private getPipelines(): IngestPipeline[] { + private getPipelines(): Array { const files = this.getFiles(); return files.map((file) => file.getPipeline()); } @@ -259,9 +259,11 @@ export class FileManager { }); } + const createPipelines = this.pipelines.length > 0; + this.setStatus({ indexCreated: STATUS.STARTED, - pipelineCreated: STATUS.STARTED, + pipelineCreated: createPipelines ? STATUS.STARTED : STATUS.NA, }); let indexCreated = false; @@ -277,10 +279,12 @@ export class FileManager { ); this.timeFieldName = this.importer.getTimeField(); indexCreated = initializeImportResp.index !== undefined; - pipelinesCreated = !!initializeImportResp.pipelineIds?.length; + pipelinesCreated = initializeImportResp.pipelineIds.length > 0; this.setStatus({ indexCreated: indexCreated ? STATUS.COMPLETED : STATUS.FAILED, - pipelineCreated: pipelinesCreated ? STATUS.COMPLETED : STATUS.FAILED, + ...(createPipelines + ? { pipelineCreated: pipelinesCreated ? STATUS.COMPLETED : STATUS.FAILED } + : {}), }); if (initializeImportResp.error) { @@ -301,7 +305,11 @@ export class FileManager { return null; } - if (!indexCreated || !pipelinesCreated || !initializeImportResp) { + if ( + indexCreated === false || + (createPipelines && pipelinesCreated === false) || + !initializeImportResp + ) { return null; } @@ -311,11 +319,12 @@ export class FileManager { // import data const files = this.getFiles(); + const createdPipelineIds = initializeImportResp.pipelineIds; try { await Promise.all( files.map(async (file, i) => { - await file.import(indexName, this.mappings!, `${indexName}-${i}-pipeline`); + await file.import(indexName, this.mappings!, createdPipelineIds[i] ?? undefined); }) ); } catch (error) { @@ -342,9 +351,7 @@ export class FileManager { this.setStatus({ pipelinesDeleted: STATUS.STARTED, }); - await this.importer.deletePipelines( - this.pipelines.map((p, i) => `${indexName}-${i}-pipeline`) - ); + await this.importer.deletePipelines(); this.setStatus({ pipelinesDeleted: STATUS.COMPLETED, }); @@ -458,6 +465,9 @@ export class FileManager { }; this.pipelines.forEach((pipeline) => { + if (pipeline === undefined) { + return; + } pipeline.processors.push({ set: { field: 'content', diff --git a/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_wrapper.ts b/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_wrapper.ts index 3460798bdc2c4..8d3cc4e445847 100644 --- a/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_wrapper.ts +++ b/x-pack/platform/plugins/private/data_visualizer/public/lite/file_manager/file_wrapper.ts @@ -169,13 +169,8 @@ export class FileWrapper { public getMappings() { return this.analyzedFile$.getValue().results?.mappings; } - public getPipeline(): IngestPipeline { - return ( - this.analyzedFile$.getValue().results?.ingest_pipeline ?? { - description: '', - processors: [], - } - ); + public getPipeline(): IngestPipeline | undefined { + return this.analyzedFile$.getValue().results?.ingest_pipeline; } public getFormat() { return this.analyzedFile$.getValue().results?.format; @@ -184,7 +179,7 @@ export class FileWrapper { return this.analyzedFile$.getValue().data; } - public async import(index: string, mappings: MappingTypeMapping, pipelineId: string) { + public async import(index: string, mappings: MappingTypeMapping, pipelineId: string | undefined) { this.setStatus({ importStatus: STATUS.STARTED }); const format = this.analyzedFile$.getValue().results!.format; const importer = await this.fileUpload.importerFactory(format, { @@ -192,7 +187,8 @@ export class FileWrapper { multilineStartPattern: this.analyzedFile$.getValue().results!.multiline_start_pattern, }); - importer.initializeWithoutCreate(index, mappings, [this.getPipeline()]); + const ingestPipeline = this.getPipeline(); + importer.initializeWithoutCreate(index, mappings, ingestPipeline ? [ingestPipeline] : []); const data = this.getData(); if (data === null) { this.setStatus({ importStatus: STATUS.FAILED }); diff --git a/x-pack/platform/plugins/private/file_upload/common/types.ts b/x-pack/platform/plugins/private/file_upload/common/types.ts index 9064c437b121c..60f9c42090084 100644 --- a/x-pack/platform/plugins/private/file_upload/common/types.ts +++ b/x-pack/platform/plugins/private/file_upload/common/types.ts @@ -102,7 +102,7 @@ export interface InitializeImportResponse { success: boolean; id: string; index: string; - pipelineIds: string[]; + pipelineIds: Array; error?: { error: estypes.ErrorCause; }; @@ -111,7 +111,7 @@ export interface InitializeImportResponse { export interface ImportResponse { success: boolean; index: string; - pipelineId: string; + pipelineId?: string; docCount: number; failures: ImportFailure[]; error?: { diff --git a/x-pack/platform/plugins/private/file_upload/public/components/geo_upload_wizard.tsx b/x-pack/platform/plugins/private/file_upload/public/components/geo_upload_wizard.tsx index 62afd84117b06..53ac52009cfb4 100644 --- a/x-pack/platform/plugins/private/file_upload/public/components/geo_upload_wizard.tsx +++ b/x-pack/platform/plugins/private/file_upload/public/components/geo_upload_wizard.tsx @@ -109,10 +109,7 @@ export class GeoUploadWizard extends Component }, }, }; - const ingestPipeline = { - description: '', - processors: [], - }; + this.setState({ importStatus: i18n.translate('xpack.fileUpload.geoUploadWizard.dataIndexingStarted', { defaultMessage: 'Creating index: {indexName}', @@ -125,7 +122,7 @@ export class GeoUploadWizard extends Component this.state.indexName, {}, mappings, - [ingestPipeline] + [] ); if (!this._isMounted) { return; diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts b/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts index 5ab0ace637bab..af356db4f601b 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/importer.ts @@ -83,18 +83,23 @@ export abstract class Importer implements IImporter { protected abstract _createDocs(t: string, isLastPart: boolean): CreateDocsResponse; - private _initialize(index: string, mappings: MappingTypeMapping, pipelines: IngestPipeline[]) { + private _initialize( + index: string, + mappings: MappingTypeMapping, + pipelines: Array + ) { for (let i = 0; i < pipelines.length; i++) { const pipeline = pipelines[i]; - updatePipelineTimezone(pipeline); + if (pipeline !== undefined) { + updatePipelineTimezone(pipeline); - if (pipelineContainsSpecialProcessors(pipeline)) { - // pipeline contains processors which we know are slow - // so reduce the chunk size significantly to avoid timeouts - this._chunkSize = REDUCED_CHUNK_SIZE; + if (pipelineContainsSpecialProcessors(pipeline)) { + // pipeline contains processors which we know are slow + // so reduce the chunk size significantly to avoid timeouts + this._chunkSize = REDUCED_CHUNK_SIZE; + } } - // if no pipeline has been supplied, - // send an empty object + this._pipelines.push({ id: `${index}-${i}-pipeline`, pipeline, @@ -118,7 +123,7 @@ export abstract class Importer implements IImporter { index: string, settings: IndicesIndexSettings, mappings: MappingTypeMapping, - pipelines: IngestPipeline[] + pipelines: Array ) { this._initialize(index, mappings, pipelines); @@ -253,8 +258,14 @@ export abstract class Importer implements IImporter { } public async deletePipelines() { + const ids = this._pipelines.filter((p) => p.pipeline !== undefined).map((p) => p.id); + + if (ids.length === 0) { + return []; + } + return await getHttp().fetch({ - path: `/internal/file_upload/remove_pipelines/${this._pipelines.map((p) => p.id).join(',')}`, + path: `/internal/file_upload/remove_pipelines/${ids.join(',')}`, method: 'DELETE', version: '1', }); diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/types.ts b/x-pack/platform/plugins/private/file_upload/public/importer/types.ts index bcf0a073e663e..659436f393d53 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/types.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/types.ts @@ -49,7 +49,7 @@ export interface IImporter { index: string, settings: IndicesIndexSettings, mappings: MappingTypeMapping, - pipeline: IngestPipeline[] + pipeline: Array ): Promise; initializeWithoutCreate( index: string, @@ -58,12 +58,12 @@ export interface IImporter { ): void; import( index: string, - ingestPipelineId: string, + ingestPipelineId: string | undefined, setImportProgress: (progress: number) => void ): Promise; initialized(): boolean; getIndex(): string | undefined; getTimeField(): string | undefined; previewIndexTimeRange(): Promise<{ start: number | null; end: number | null }>; - deletePipelines(pipelineIds: string[]): Promise; + deletePipelines(): Promise; } diff --git a/x-pack/platform/plugins/private/file_upload/server/import_data.ts b/x-pack/platform/plugins/private/file_upload/server/import_data.ts index fba3858dd83a8..50b2b3a7e4a19 100644 --- a/x-pack/platform/plugins/private/file_upload/server/import_data.ts +++ b/x-pack/platform/plugins/private/file_upload/server/import_data.ts @@ -29,7 +29,7 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) { ingestPipelines: IngestPipelineWrapper[] ): Promise { let createdIndex; - const createdPipelineIds: string[] = []; + const createdPipelineIds: Array = []; const id = generateId(); try { await createIndex(index, settings, mappings); @@ -38,6 +38,10 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) { // create the pipeline if one has been supplied if (ingestPipelines !== undefined) { for (const p of ingestPipelines) { + if (p.pipeline === undefined) { + createdPipelineIds.push(undefined); + continue; + } const resp = await createPipeline(p.id, p.pipeline); createdPipelineIds.push(p.id); if (resp.acknowledged !== true) { @@ -65,7 +69,7 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) { async function importData( index: string, - ingestPipelineId: string, + ingestPipelineId: string | undefined, data: InputData ): Promise { const docCount = data.length; diff --git a/x-pack/platform/plugins/private/file_upload/server/schemas.ts b/x-pack/platform/plugins/private/file_upload/server/schemas.ts index fb09272338853..76da1622862ea 100644 --- a/x-pack/platform/plugins/private/file_upload/server/schemas.ts +++ b/x-pack/platform/plugins/private/file_upload/server/schemas.ts @@ -28,7 +28,7 @@ export const analyzeFileQuerySchema = schema.object({ const ingestPipeline = schema.object({ id: schema.string(), - pipeline: schema.any(), + pipeline: schema.maybe(schema.any()), }); export const initializeImportFileBodySchema = schema.object({ @@ -44,7 +44,7 @@ export const initializeImportFileBodySchema = schema.object({ export const importFileBodySchema = schema.object({ index: schema.string(), data: schema.arrayOf(schema.any()), - ingestPipelineId: schema.string(), + ingestPipelineId: schema.maybe(schema.string()), }); export const runtimeMappingsSchema = schema.object( From 3b340af3971c6444a0e649ab69a0b22f1393d6bf Mon Sep 17 00:00:00 2001 From: James Gowdy Date: Fri, 21 Feb 2025 10:08:37 +0000 Subject: [PATCH 6/7] import type --- .../plugins/private/file_upload/public/importer/routes.ts | 4 ++-- .../plugins/private/file_upload/server/import_data.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts b/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts index 88b22e801272c..ca08988be106a 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts @@ -5,11 +5,11 @@ * 2.0. */ -import { +import type { IndicesIndexSettings, MappingTypeMapping, } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; -import { +import type { IngestPipelineWrapper, InitializeImportResponse, ImportDoc, diff --git a/x-pack/platform/plugins/private/file_upload/server/import_data.ts b/x-pack/platform/plugins/private/file_upload/server/import_data.ts index 50b2b3a7e4a19..790d7e74ee0ec 100644 --- a/x-pack/platform/plugins/private/file_upload/server/import_data.ts +++ b/x-pack/platform/plugins/private/file_upload/server/import_data.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { IScopedClusterClient } from '@kbn/core/server'; +import type { IScopedClusterClient } from '@kbn/core/server'; import type { BulkRequest, IndicesCreateRequest, @@ -13,7 +13,7 @@ import type { MappingTypeMapping, } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { INDEX_META_DATA_CREATED_BY } from '../common/constants'; -import { +import type { ImportResponse, ImportFailure, InputData, From b379719a427832731595ea4dd3db69a6f64e37a9 Mon Sep 17 00:00:00 2001 From: James Gowdy Date: Tue, 25 Feb 2025 20:50:41 +0000 Subject: [PATCH 7/7] fixing types --- .../plugins/private/file_upload/public/importer/routes.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts b/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts index ca08988be106a..5987344168906 100644 --- a/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts +++ b/x-pack/platform/plugins/private/file_upload/public/importer/routes.ts @@ -8,7 +8,7 @@ import type { IndicesIndexSettings, MappingTypeMapping, -} from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +} from '@elastic/elasticsearch/lib/api/types'; import type { IngestPipelineWrapper, InitializeImportResponse,