"use strict"; var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); Object.defineProperty(exports, "__esModule", { value: true }); exports.MAX_CHUNK_CHAR_COUNT = exports.Importer = exports.IMPORT_RETRIES = void 0; exports.callImportRoute = callImportRoute; var _defineProperty2 = _interopRequireDefault(require("@babel/runtime/helpers/defineProperty")); var _lodash = require("lodash"); var _moment = _interopRequireDefault(require("moment")); var _i18n = require("@kbn/i18n"); var _mlIsPopulatedObject = require("@kbn/ml-is-populated-object"); var _kibana_services = require("../kibana_services"); var _constants = require("../../common/constants"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ const CHUNK_SIZE = 5000; const REDUCED_CHUNK_SIZE = 100; const MAX_CHUNK_CHAR_COUNT = 1000000; exports.MAX_CHUNK_CHAR_COUNT = MAX_CHUNK_CHAR_COUNT; const IMPORT_RETRIES = 5; exports.IMPORT_RETRIES = IMPORT_RETRIES; const STRING_CHUNKS_MB = 100; class Importer { constructor() { (0, _defineProperty2.default)(this, "_docArray", []); (0, _defineProperty2.default)(this, "_chunkSize", CHUNK_SIZE); } read(data) { const decoder = new TextDecoder(); const size = STRING_CHUNKS_MB * _constants.MB; // chop the data up into 100MB chunks for processing. // if the chop produces a partial line at the end, a character "remainder" count // is returned which is used to roll the next chunk back that many chars so // it is included in the next chunk. const parts = Math.ceil(data.byteLength / size); let remainder = 0; for (let i = 0; i < parts; i++) { const byteArray = decoder.decode(data.slice(i * size - remainder, (i + 1) * size)); const { success, docs, remainder: tempRemainder } = this._createDocs(byteArray, i === parts - 1); if (success) { this._docArray = this._docArray.concat(docs); remainder = tempRemainder; } else { return { success: false }; } } return { success: true }; } async initializeImport(index, settings, mappings, pipeline) { updatePipelineTimezone(pipeline); if (pipelineContainsSpecialProcessors(pipeline)) { // pipeline contains processors which we know are slow // so reduce the chunk size significantly to avoid timeouts this._chunkSize = REDUCED_CHUNK_SIZE; } // if no pipeline has been supplied, // send an empty object const ingestPipeline = pipeline !== undefined ? { id: `${index}-pipeline`, pipeline } : {}; return await callImportRoute({ id: undefined, index, data: [], settings, mappings, ingestPipeline }); } async import(id, index, pipelineId, setImportProgress) { if (!id || !index) { return { success: false, error: _i18n.i18n.translate('xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage', { defaultMessage: 'no ID or index supplied' }) }; } const chunks = createDocumentChunks(this._docArray, this._chunkSize); const ingestPipeline = { id: pipelineId }; let success = true; const failures = []; let error; for (let i = 0; i < chunks.length; i++) { let retries = IMPORT_RETRIES; let resp = { success: false, failures: [], docCount: 0, id: '', index: '', pipelineId: '' }; while (resp.success === false && retries > 0) { try { resp = await callImportRoute({ id, index, data: chunks[i], settings: {}, mappings: {}, ingestPipeline }); if (retries < IMPORT_RETRIES) { // eslint-disable-next-line no-console console.log(`Retrying import ${IMPORT_RETRIES - retries}`); } retries--; } catch (err) { resp.success = false; resp.error = err; retries = 0; } } if (resp.success) { setImportProgress((i + 1) / chunks.length * 100); } else { // eslint-disable-next-line no-console console.error(resp); success = false; error = resp.error; populateFailures(resp, failures, i, this._chunkSize); break; } populateFailures(resp, failures, i, this._chunkSize); } const result = { success, failures, docCount: this._docArray.length }; if (success) { setImportProgress(100); } else { result.error = error; } return result; } } exports.Importer = Importer; function populateFailures(error, failures, chunkCount, chunkSize) { if (error.failures && error.failures.length) { // update the item value to include the chunk count // e.g. item 3 in chunk 2 is actually item 20003 for (let f = 0; f < error.failures.length; f++) { const failure = error.failures[f]; failure.item = failure.item + chunkSize * chunkCount; } failures.push(...error.failures); } } // The file structure endpoint sets the timezone to be {{ event.timezone }} // as that's the variable Filebeat would send the client timezone in. // In this data import function the UI is effectively performing the role of Filebeat, // i.e. doing basic parsing, processing and conversion to JSON before forwarding to the ingest pipeline. // But it's not sending every single field that Filebeat would add, so the ingest pipeline // cannot look for a event.timezone variable in each input record. // Therefore we need to replace {{ event.timezone }} with the actual browser timezone function updatePipelineTimezone(ingestPipeline) { if (ingestPipeline !== undefined && ingestPipeline.processors && ingestPipeline.processors) { const dateProcessor = ingestPipeline.processors.find(p => p.date !== undefined && p.date.timezone === '{{ event.timezone }}'); if (dateProcessor) { dateProcessor.date.timezone = _moment.default.tz.guess(); } } } function createDocumentChunks(docArray, chunkSize) { const chunks = []; // chop docArray into chunks const tempChunks = (0, _lodash.chunk)(docArray, chunkSize); // loop over tempChunks and check that the total character length // for each chunk is within the MAX_CHUNK_CHAR_COUNT. // if the length is too long, split the chunk into smaller chunks // based on how much larger it is than MAX_CHUNK_CHAR_COUNT // note, each document is a different size, so dividing by charCountOfDocs // only produces an average chunk size that should be smaller than the max length for (let i = 0; i < tempChunks.length; i++) { const docs = tempChunks[i]; const numberOfDocs = docs.length; const charCountOfDocs = JSON.stringify(docs).length; if (charCountOfDocs > MAX_CHUNK_CHAR_COUNT) { // calculate new chunk size which should produce a chunk // who's length is on average around MAX_CHUNK_CHAR_COUNT const adjustedChunkSize = Math.floor(MAX_CHUNK_CHAR_COUNT / charCountOfDocs * numberOfDocs); const smallerChunks = (0, _lodash.chunk)(docs, adjustedChunkSize); chunks.push(...smallerChunks); } else { chunks.push(docs); } } return chunks; } function pipelineContainsSpecialProcessors(pipeline) { const findKeys = obj => { // return all nested keys in the pipeline const keys = []; Object.entries(obj).forEach(([key, val]) => { keys.push(key); if ((0, _mlIsPopulatedObject.isPopulatedObject)(val)) { keys.push(...findKeys(val)); } }); return keys; }; const keys = findKeys(pipeline); const specialProcessors = ['inference', 'enrich']; return (0, _lodash.intersection)(specialProcessors, keys).length !== 0; } function callImportRoute({ id, index, data, settings, mappings, ingestPipeline }) { const query = id !== undefined ? { id } : {}; const body = JSON.stringify({ index, data, settings, mappings, ingestPipeline }); return (0, _kibana_services.getHttp)().fetch({ path: `/internal/file_upload/import`, method: 'POST', version: '1', query, body }); }