"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.reindexServiceFactory = void 0; var _rxjs = require("rxjs"); var _types = require("../../../common/types"); var _es_indices_state_check = require("../es_indices_state_check"); var _index_settings = require("./index_settings"); var _error = require("./error"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ const reindexServiceFactory = (esClient, actions, log, licensing) => { // ------ Utility functions const cleanupChanges = async reindexOp => { // Cancel reindex task if it was started but not completed if (reindexOp.attributes.lastCompletedStep === _types.ReindexStep.reindexStarted) { var _reindexOp$attributes; await esClient.tasks.cancel({ task_id: (_reindexOp$attributes = reindexOp.attributes.reindexTaskId) !== null && _reindexOp$attributes !== void 0 ? _reindexOp$attributes : undefined }).catch(() => undefined); // Ignore any exceptions trying to cancel (it may have already completed). } // Set index back to writable if we ever got past this point. if (reindexOp.attributes.lastCompletedStep >= _types.ReindexStep.readonly) { await esClient.indices.putSettings({ index: reindexOp.attributes.indexName, body: { blocks: { write: false } } }); } if (reindexOp.attributes.lastCompletedStep >= _types.ReindexStep.newIndexCreated && reindexOp.attributes.lastCompletedStep < _types.ReindexStep.aliasCreated) { await esClient.indices.delete({ index: reindexOp.attributes.newIndexName }); } return reindexOp; }; // ------ Functions used to process the state machine /** * Sets the original index as readonly so new data can be indexed until the reindex * is completed. * @param reindexOp */ const setReadonly = async reindexOp => { const { indexName } = reindexOp.attributes; const putReadonly = await esClient.indices.putSettings({ index: indexName, body: { blocks: { write: true } } }); if (!putReadonly.acknowledged) { throw new Error(`Index could not be set to readonly.`); } return actions.updateReindexOp(reindexOp, { lastCompletedStep: _types.ReindexStep.readonly }); }; /** * Creates a new index with the same mappings and settings as the original index. * @param reindexOp */ const createNewIndex = async reindexOp => { var _createIndex; const { indexName, newIndexName } = reindexOp.attributes; const flatSettings = await actions.getFlatSettings(indexName); if (!flatSettings) { throw _error.error.indexNotFound(`Index ${indexName} does not exist.`); } const { settings, mappings } = (0, _index_settings.transformFlatSettings)(flatSettings); let createIndex; try { createIndex = await esClient.indices.create({ index: newIndexName, body: { settings, mappings } }); } catch (err) { var _err$body, _err$body$error; // If for any reason the new index name generated by the `generateNewIndexName` already // exists (this could happen if kibana is restarted during reindexing), we can just go // ahead with the process without needing to create the index again. // See: https://github.com/elastic/kibana/issues/123816 if ((err === null || err === void 0 ? void 0 : (_err$body = err.body) === null || _err$body === void 0 ? void 0 : (_err$body$error = _err$body.error) === null || _err$body$error === void 0 ? void 0 : _err$body$error.type) !== 'resource_already_exists_exception') { throw err; } } if (createIndex && !((_createIndex = createIndex) !== null && _createIndex !== void 0 && _createIndex.acknowledged)) { throw _error.error.cannotCreateIndex(`Index could not be created: ${newIndexName}`); } return actions.updateReindexOp(reindexOp, { lastCompletedStep: _types.ReindexStep.newIndexCreated }); }; /** * Begins the reindex process via Elasticsearch's Reindex API. * @param reindexOp */ const startReindexing = async reindexOp => { const { indexName, reindexOptions } = reindexOp.attributes; // Where possible, derive reindex options at the last moment before reindexing // to prevent them from becoming stale as they wait in the queue. const indicesState = await (0, _es_indices_state_check.esIndicesStateCheck)(esClient, [indexName]); const shouldOpenAndClose = indicesState[indexName] === 'closed'; if (shouldOpenAndClose) { log.debug(`Detected closed index ${indexName}, opening...`); await esClient.indices.open({ index: indexName }); } const flatSettings = await actions.getFlatSettings(indexName); if (!flatSettings) { throw _error.error.indexNotFound(`Index ${indexName} does not exist.`); } const startReindexResponse = await esClient.reindex({ refresh: true, wait_for_completion: false, body: { source: { index: indexName }, dest: { index: reindexOp.attributes.newIndexName } } }); return actions.updateReindexOp(reindexOp, { lastCompletedStep: _types.ReindexStep.reindexStarted, reindexTaskId: startReindexResponse.task === undefined ? startReindexResponse.task : String(startReindexResponse.task), reindexTaskPercComplete: 0, reindexOptions: { ...(reindexOptions !== null && reindexOptions !== void 0 ? reindexOptions : {}), // Indicate to downstream states whether we opened a closed index that should be // closed again. openAndClose: shouldOpenAndClose } }); }; /** * Polls Elasticsearch's Tasks API to see if the reindex operation has been completed. * @param reindexOp */ const updateReindexStatus = async reindexOp => { var _taskResponse$task$st; const taskId = reindexOp.attributes.reindexTaskId; // Check reindexing task progress const taskResponse = await esClient.tasks.get({ task_id: taskId, wait_for_completion: false }); if (!taskResponse.completed) { // Updated the percent complete const perc = taskResponse.task.status.created / taskResponse.task.status.total; return actions.updateReindexOp(reindexOp, { reindexTaskPercComplete: perc }); } else if (((_taskResponse$task$st = taskResponse.task.status) === null || _taskResponse$task$st === void 0 ? void 0 : _taskResponse$task$st.canceled) === 'by user request') { // Set the status to cancelled reindexOp = await actions.updateReindexOp(reindexOp, { status: _types.ReindexStatus.cancelled }); // Do any other cleanup work necessary reindexOp = await cleanupChanges(reindexOp); } else { var _taskResponse$respons, _taskResponse$respons2; // Check that no failures occurred if ((_taskResponse$respons = taskResponse.response) !== null && _taskResponse$respons !== void 0 && (_taskResponse$respons2 = _taskResponse$respons.failures) !== null && _taskResponse$respons2 !== void 0 && _taskResponse$respons2.length) { // Include the entire task result in the error message. This should be guaranteed // to be JSON-serializable since it just came back from Elasticsearch. throw _error.error.reindexTaskFailed(`Reindexing failed: ${JSON.stringify(taskResponse)}`); } // Update the status reindexOp = await actions.updateReindexOp(reindexOp, { lastCompletedStep: _types.ReindexStep.reindexCompleted, reindexTaskPercComplete: 1 }); } // Delete the task from ES .tasks index const deleteTaskResp = await esClient.delete({ index: '.tasks', id: taskId }); if (deleteTaskResp.result !== 'deleted') { throw _error.error.reindexTaskCannotBeDeleted(`Could not delete reindexing task ${taskId}`); } return reindexOp; }; const getIndexAliases = async indexName => { var _response$indexName$a, _response$indexName; const response = await esClient.indices.getAlias({ index: indexName }); return (_response$indexName$a = (_response$indexName = response[indexName]) === null || _response$indexName === void 0 ? void 0 : _response$indexName.aliases) !== null && _response$indexName$a !== void 0 ? _response$indexName$a : {}; }; /** * Creates an alias that points the old index to the new index, deletes the old index. * @param reindexOp */ const switchAlias = async reindexOp => { const { indexName, newIndexName, reindexOptions } = reindexOp.attributes; const existingAliases = await getIndexAliases(indexName); const extraAliases = Object.keys(existingAliases).map(aliasName => ({ add: { index: newIndexName, alias: aliasName, ...existingAliases[aliasName] } })); const aliasResponse = await esClient.indices.updateAliases({ body: { actions: [{ add: { index: newIndexName, alias: indexName } }, { remove_index: { index: indexName } }, ...extraAliases] } }); if (!aliasResponse.acknowledged) { throw _error.error.cannotCreateIndex(`Index aliases could not be created.`); } if ((reindexOptions === null || reindexOptions === void 0 ? void 0 : reindexOptions.openAndClose) === true) { await esClient.indices.close({ index: indexName }); } return actions.updateReindexOp(reindexOp, { lastCompletedStep: _types.ReindexStep.aliasCreated }); }; // ------ The service itself return { async hasRequiredPrivileges(indexName) { /** * To avoid a circular dependency on Security we use a work around * here to detect whether Security is available and enabled * (i.e., via the licensing plugin). This enables Security to use * functionality exposed through Upgrade Assistant. */ const license = await (0, _rxjs.firstValueFrom)(licensing.license$); const securityFeature = license.getFeature('security'); // If security is disabled or unavailable, return true. if (!securityFeature || !(securityFeature.isAvailable && securityFeature.isEnabled)) { return true; } const names = [indexName, (0, _index_settings.generateNewIndexName)(indexName)]; const sourceName = (0, _index_settings.sourceNameForIndex)(indexName); // if we have re-indexed this in the past, there will be an // underlying alias we will also need to update. if (sourceName !== indexName) { names.push(sourceName); } // Otherwise, query for required privileges for this index. const body = { cluster: ['manage'], index: [{ names, allow_restricted_indices: true, privileges: ['all'] }, { names: ['.tasks'], privileges: ['read', 'delete'] }] }; const resp = await esClient.security.hasPrivileges({ body }); return resp.has_all_requested; }, async detectReindexWarnings(indexName) { const flatSettings = await actions.getFlatSettings(indexName); if (!flatSettings) { return undefined; } else { return [ // By default all reindexing operations will replace an index with an alias (with the same name) // pointing to a newly created "reindexed" index. This is destructive as delete operations originally // done on the index itself will now need to be done to the "reindexed-{indexName}" { warningType: 'replaceIndexWithAlias' }, ...(0, _index_settings.getReindexWarnings)(flatSettings)]; } }, async createReindexOperation(indexName, opts) { const indexExists = await esClient.indices.exists({ index: indexName }); if (!indexExists) { throw _error.error.indexNotFound(`Index ${indexName} does not exist in this cluster.`); } const existingReindexOps = await actions.findReindexOperations(indexName); if (existingReindexOps.total !== 0) { const existingOp = existingReindexOps.saved_objects[0]; if (existingOp.attributes.status === _types.ReindexStatus.failed || existingOp.attributes.status === _types.ReindexStatus.cancelled) { // Delete the existing one if it failed or was cancelled to give a chance to retry. await actions.deleteReindexOp(existingOp); } else { throw _error.error.reindexAlreadyInProgress(`A reindex operation already in-progress for ${indexName}`); } } return actions.createReindexOp(indexName, opts !== null && opts !== void 0 && opts.enqueue ? { queueSettings: { queuedAt: Date.now() } } : undefined); }, async findReindexOperation(indexName) { const findResponse = await actions.findReindexOperations(indexName); // Bail early if it does not exist or there is more than one. if (findResponse.total === 0) { return null; } else if (findResponse.total > 1) { throw _error.error.multipleReindexJobsFound(`More than one reindex operation found for ${indexName}`); } return findResponse.saved_objects[0]; }, async cleanupReindexOperations(indexNames) { const performCleanup = async indexName => { const existingReindexOps = await actions.findReindexOperations(indexName); if (existingReindexOps && existingReindexOps.total !== 0) { const existingOp = existingReindexOps.saved_objects[0]; if (existingOp.attributes.status === _types.ReindexStatus.completed) { // Delete the existing one if its status is completed, but still contains deprecation warnings // example scenario: index was upgraded, but then deleted and restored with an old snapshot await actions.deleteReindexOp(existingOp); } } }; await Promise.all(indexNames.map(performCleanup)); }, findAllByStatus: actions.findAllByStatus, async processNextStep(reindexOp) { return actions.runWhileLocked(reindexOp, async lockedReindexOp => { try { switch (lockedReindexOp.attributes.lastCompletedStep) { case _types.ReindexStep.created: lockedReindexOp = await setReadonly(lockedReindexOp); break; case _types.ReindexStep.readonly: lockedReindexOp = await createNewIndex(lockedReindexOp); break; case _types.ReindexStep.newIndexCreated: lockedReindexOp = await startReindexing(lockedReindexOp); break; case _types.ReindexStep.reindexStarted: lockedReindexOp = await updateReindexStatus(lockedReindexOp); break; case _types.ReindexStep.reindexCompleted: lockedReindexOp = await switchAlias(lockedReindexOp); break; case _types.ReindexStep.aliasCreated: lockedReindexOp = await actions.updateReindexOp(lockedReindexOp, { status: _types.ReindexStatus.completed }); break; default: break; } } catch (e) { log.error(`Reindexing step failed: ${e instanceof Error ? e.stack : e.toString()}`); // Trap the exception and add the message to the object so the UI can display it. lockedReindexOp = await actions.updateReindexOp(lockedReindexOp, { status: _types.ReindexStatus.failed, errorMessage: e.toString() }); // Cleanup any changes, ignoring any errors. lockedReindexOp = await cleanupChanges(lockedReindexOp).catch(err => lockedReindexOp); } return lockedReindexOp; }); }, async pauseReindexOperation(indexName) { const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { throw new Error(`No reindex operation found for index ${indexName}`); } return actions.runWhileLocked(reindexOp, async op => { if (op.attributes.status === _types.ReindexStatus.paused) { // Another node already paused the operation, don't do anything return reindexOp; } else if (op.attributes.status !== _types.ReindexStatus.inProgress) { throw new Error(`Reindex operation must be inProgress in order to be paused.`); } return actions.updateReindexOp(op, { status: _types.ReindexStatus.paused }); }); }, async resumeReindexOperation(indexName, opts) { const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { throw new Error(`No reindex operation found for index ${indexName}`); } return actions.runWhileLocked(reindexOp, async op => { if (op.attributes.status === _types.ReindexStatus.inProgress) { // Another node already resumed the operation, don't do anything return reindexOp; } else if (op.attributes.status !== _types.ReindexStatus.paused) { throw new Error(`Reindex operation must be paused in order to be resumed.`); } const queueSettings = opts !== null && opts !== void 0 && opts.enqueue ? { queuedAt: Date.now() } : undefined; return actions.updateReindexOp(op, { status: _types.ReindexStatus.inProgress, reindexOptions: queueSettings ? { queueSettings } : undefined }); }); }, async startQueuedReindexOperation(indexName) { var _reindexOp$attributes2; const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { throw _error.error.indexNotFound(`No reindex operation found for index ${indexName}`); } if (!((_reindexOp$attributes2 = reindexOp.attributes.reindexOptions) !== null && _reindexOp$attributes2 !== void 0 && _reindexOp$attributes2.queueSettings)) { throw _error.error.reindexIsNotInQueue(`Reindex operation ${indexName} is not in the queue.`); } return actions.runWhileLocked(reindexOp, async lockedReindexOp => { const { reindexOptions } = lockedReindexOp.attributes; reindexOptions.queueSettings.startedAt = Date.now(); return actions.updateReindexOp(lockedReindexOp, { reindexOptions }); }); }, async cancelReindexing(indexName) { const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { throw _error.error.indexNotFound(`No reindex operation found for index ${indexName}`); } else if (reindexOp.attributes.status !== _types.ReindexStatus.inProgress) { throw _error.error.reindexCannotBeCancelled(`Reindex operation is not in progress`); } else if (reindexOp.attributes.lastCompletedStep !== _types.ReindexStep.reindexStarted) { throw _error.error.reindexCannotBeCancelled(`Reindex operation is not currently waiting for reindex task to complete`); } const resp = await esClient.tasks.cancel({ task_id: reindexOp.attributes.reindexTaskId }); if (resp.node_failures && resp.node_failures.length > 0) { throw _error.error.reindexCannotBeCancelled(`Could not cancel reindex.`); } return reindexOp; }, getIndexAliases }; }; exports.reindexServiceFactory = reindexServiceFactory;