"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.nextActionMap = exports.next = void 0; var _pipeable = require("fp-ts/lib/pipeable"); var Option = _interopRequireWildcard(require("fp-ts/lib/Option")); var TaskEither = _interopRequireWildcard(require("fp-ts/lib/TaskEither")); var _lodash = require("lodash"); var _utils = require("./common/utils"); var Actions = _interopRequireWildcard(require("./actions")); var _core = require("./core"); function _getRequireWildcardCache(nodeInterop) { if (typeof WeakMap !== "function") return null; var cacheBabelInterop = new WeakMap(); var cacheNodeInterop = new WeakMap(); return (_getRequireWildcardCache = function (nodeInterop) { return nodeInterop ? cacheNodeInterop : cacheBabelInterop; })(nodeInterop); } function _interopRequireWildcard(obj, nodeInterop) { if (!nodeInterop && obj && obj.__esModule) { return obj; } if (obj === null || typeof obj !== "object" && typeof obj !== "function") { return { default: obj }; } var cache = _getRequireWildcardCache(nodeInterop); if (cache && cache.has(obj)) { return cache.get(obj); } var newObj = {}; var hasPropertyDescriptor = Object.defineProperty && Object.getOwnPropertyDescriptor; for (var key in obj) { if (key !== "default" && Object.prototype.hasOwnProperty.call(obj, key)) { var desc = hasPropertyDescriptor ? Object.getOwnPropertyDescriptor(obj, key) : null; if (desc && (desc.get || desc.set)) { Object.defineProperty(newObj, key, desc); } else { newObj[key] = obj[key]; } } } newObj.default = obj; if (cache) { cache.set(obj, newObj); } return newObj; } /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the Server Side Public License, v 1; you may not use this file except * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ const nextActionMap = (client, transformRawDocs, readyToReindex, doneReindexing, updateRelocationAliases) => { return { INIT: state => Actions.initAction({ client, indices: [state.currentAlias, state.versionAlias] }), WAIT_FOR_MIGRATION_COMPLETION: state => Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }), WAIT_FOR_YELLOW_SOURCE: state => Actions.waitForIndexStatus({ client, index: state.sourceIndex.value, status: 'yellow' }), UPDATE_SOURCE_MAPPINGS_PROPERTIES: ({ sourceIndex, sourceIndexMappings, targetIndexMappings }) => Actions.updateSourceMappingsProperties({ client, sourceIndex: sourceIndex.value, sourceMappings: sourceIndexMappings.value, targetMappings: targetIndexMappings }), CLEANUP_UNKNOWN_AND_EXCLUDED: state => Actions.cleanupUnknownAndExcluded({ client, indexName: state.sourceIndex.value, discardUnknownDocs: state.discardUnknownObjects, excludeOnUpgradeQuery: state.excludeOnUpgradeQuery, excludeFromUpgradeFilterHooks: state.excludeFromUpgradeFilterHooks, knownTypes: state.knownTypes, removedTypes: _core.REMOVED_TYPES }), CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK: state => Actions.waitForDeleteByQueryTask({ client, taskId: state.deleteByQueryTaskId, timeout: '120s' }), PREPARE_COMPATIBLE_MIGRATION: state => Actions.updateAliases({ client, aliasActions: state.preTransformDocsActions }), REFRESH_SOURCE: state => Actions.refreshIndex({ client, index: state.sourceIndex.value }), CHECK_UNKNOWN_DOCUMENTS: state => Actions.checkForUnknownDocs({ client, indexName: state.sourceIndex.value, excludeOnUpgradeQuery: state.excludeOnUpgradeQuery, knownTypes: state.knownTypes }), SET_SOURCE_WRITE_BLOCK: state => Actions.setWriteBlock({ client, index: state.sourceIndex.value }), CALCULATE_EXCLUDE_FILTERS: state => Actions.calculateExcludeFilters({ client, excludeFromUpgradeFilterHooks: state.excludeFromUpgradeFilterHooks }), CREATE_NEW_TARGET: state => Actions.createIndex({ client, indexName: state.targetIndex, mappings: state.targetIndexMappings }), CREATE_REINDEX_TEMP: state => Actions.createIndex({ client, indexName: state.tempIndex, aliases: [state.tempIndexAlias], mappings: state.tempIndexMappings }), READY_TO_REINDEX_SYNC: () => Actions.synchronizeMigrators({ waitGroup: readyToReindex }), REINDEX_SOURCE_TO_TEMP_OPEN_PIT: state => Actions.openPit({ client, index: state.sourceIndex.value }), REINDEX_SOURCE_TO_TEMP_READ: state => Actions.readWithPit({ client, pitId: state.sourceIndexPitId, /* When reading we use a source query to exclude saved objects types which * are no longer used. These saved objects will still be kept in the outdated * index for backup purposes, but won't be available in the upgraded index. */ query: state.excludeOnUpgradeQuery, batchSize: state.batchSize, searchAfter: state.lastHitSortValue }), REINDEX_SOURCE_TO_TEMP_CLOSE_PIT: state => Actions.closePit({ client, pitId: state.sourceIndexPitId }), REINDEX_SOURCE_TO_TEMP_TRANSFORM: state => Actions.transformDocs({ transformRawDocs, outdatedDocuments: state.outdatedDocuments }), REINDEX_SOURCE_TO_TEMP_INDEX_BULK: state => Actions.bulkOverwriteTransformedDocuments({ client, /* * Since other nodes can delete the temp index while we're busy writing * to it, we use the alias to prevent the auto-creation of the index if * it doesn't exist. */ index: state.tempIndexAlias, useAliasToPreventAutoCreate: true, operations: state.bulkOperationBatches[state.currentBatch], /** * Since we don't run a search against the target index, we disable "refresh" to speed up * the migration process. * Although any further step must run "refresh" for the target index * before we reach out to the OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT step. * Right now, it's performed during REFRESH_TARGET step. */ refresh: false }), DONE_REINDEXING_SYNC: () => Actions.synchronizeMigrators({ waitGroup: doneReindexing }), SET_TEMP_WRITE_BLOCK: state => Actions.setWriteBlock({ client, index: state.tempIndex }), CLONE_TEMP_TO_TARGET: state => Actions.cloneIndex({ client, source: state.tempIndex, target: state.targetIndex }), REFRESH_TARGET: state => Actions.refreshIndex({ client, index: state.targetIndex }), CHECK_TARGET_MAPPINGS: state => Actions.checkTargetMappings({ actualMappings: Option.toUndefined(state.sourceIndexMappings), expectedMappings: state.targetIndexMappings }), UPDATE_TARGET_MAPPINGS_PROPERTIES: state => Actions.updateAndPickupMappings({ client, index: state.targetIndex, mappings: (0, _lodash.omit)(state.targetIndexMappings, ['_meta']), // ._meta property will be updated on a later step batchSize: state.batchSize, query: Option.toUndefined(state.updatedTypesQuery) }), UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK: state => Actions.waitForPickupUpdatedMappingsTask({ client, taskId: state.updateTargetMappingsTaskId, timeout: '60s' }), UPDATE_TARGET_MAPPINGS_META: state => { return Actions.updateMappings({ client, index: state.targetIndex, mappings: (0, _lodash.omit)(state.targetIndexMappings, ['properties']) // properties already updated on a previous step }); }, CHECK_VERSION_INDEX_READY_ACTIONS: () => Actions.noop, OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT: state => Actions.openPit({ client, index: state.targetIndex }), OUTDATED_DOCUMENTS_SEARCH_READ: state => Actions.readWithPit({ client, pitId: state.pitId, // search for outdated documents only query: state.outdatedDocumentsQuery, batchSize: state.batchSize, searchAfter: state.lastHitSortValue, maxResponseSizeBytes: state.maxReadBatchSizeBytes }), OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: state => Actions.closePit({ client, pitId: state.pitId }), OUTDATED_DOCUMENTS_REFRESH: state => Actions.refreshIndex({ client, index: state.targetIndex }), OUTDATED_DOCUMENTS_TRANSFORM: state => Actions.transformDocs({ transformRawDocs, outdatedDocuments: state.outdatedDocuments }), TRANSFORMED_DOCUMENTS_BULK_INDEX: state => Actions.bulkOverwriteTransformedDocuments({ client, index: state.targetIndex, operations: state.bulkOperationBatches[state.currentBatch], /** * Since we don't run a search against the target index, we disable "refresh" to speed up * the migration process. * Although any further step must run "refresh" for the target index * Right now, it's performed during OUTDATED_DOCUMENTS_REFRESH step. */ refresh: false }), MARK_VERSION_INDEX_READY: state => Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }), MARK_VERSION_INDEX_READY_SYNC: state => (0, _pipeable.pipe)( // First, we wait for all the migrators involved in a relocation to reach this point. Actions.synchronizeMigrators({ waitGroup: updateRelocationAliases, payload: state.versionIndexReadyActions.value }), // Then, all migrators will try to update all aliases (from all indices). Only the first one will succeed. // The others will receive alias_not_found_exception and cause MARK_VERSION_INDEX_READY_CONFLICT (that's acceptable). TaskEither.chainW(({ data }) => Actions.updateAliases({ client, aliasActions: data.flat() }))), MARK_VERSION_INDEX_READY_CONFLICT: state => Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }), LEGACY_SET_WRITE_BLOCK: state => Actions.setWriteBlock({ client, index: state.legacyIndex }), LEGACY_CREATE_REINDEX_TARGET: state => Actions.createIndex({ client, indexName: state.sourceIndex.value, mappings: state.sourceIndexMappings.value }), LEGACY_REINDEX: state => Actions.reindex({ client, sourceIndex: state.legacyIndex, targetIndex: state.sourceIndex.value, reindexScript: state.preMigrationScript, requireAlias: false, excludeOnUpgradeQuery: state.excludeOnUpgradeQuery, batchSize: state.batchSize }), LEGACY_REINDEX_WAIT_FOR_TASK: state => Actions.waitForReindexTask({ client, taskId: state.legacyReindexTaskId, timeout: '60s' }), LEGACY_DELETE: state => Actions.updateAliases({ client, aliasActions: state.legacyPreMigrationDoneActions }) }; }; exports.nextActionMap = nextActionMap; const next = (client, transformRawDocs, readyToReindex, doneReindexing, updateRelocationAliases) => { const map = nextActionMap(client, transformRawDocs, readyToReindex, doneReindexing, updateRelocationAliases); return state => { const delay = (0, _utils.createDelayFn)(state); if (state.controlState === 'DONE' || state.controlState === 'FATAL') { // Return null if we're in one of the terminating states return null; } else { // Otherwise return the delayed action // We use an explicit cast as otherwise TS infers `(state: never) => ...` // here because state is inferred to be the intersection of all states // instead of the union. const nextAction = map[state.controlState]; return delay(nextAction(state)); } }; }; exports.next = next;