"use strict"; var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); Object.defineProperty(exports, "__esModule", { value: true }); exports.KnowledgeBaseService = exports.KnowledgeBaseEntryOperationType = void 0; var _defineProperty2 = _interopRequireDefault(require("@babel/runtime/helpers/defineProperty")); var _elasticsearch = require("@elastic/elasticsearch"); var _boom = require("@hapi/boom"); var _pLimit = _interopRequireDefault(require("p-limit")); var _pRetry = _interopRequireDefault(require("p-retry")); var _lodash = require("lodash"); var _ = require(".."); var _get_access_query = require("../util/get_access_query"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ function isAlreadyExistsError(error) { return error instanceof _elasticsearch.errors.ResponseError && (error.body.error.type === 'resource_not_found_exception' || error.body.error.type === 'status_exception'); } const ELSER_MODEL_ID = '.elser_model_1'; function throwKnowledgeBaseNotReady(body) { throw (0, _boom.serverUnavailable)(`Knowledge base is not ready yet`, body); } let KnowledgeBaseEntryOperationType; exports.KnowledgeBaseEntryOperationType = KnowledgeBaseEntryOperationType; (function (KnowledgeBaseEntryOperationType) { KnowledgeBaseEntryOperationType["Index"] = "index"; KnowledgeBaseEntryOperationType["Delete"] = "delete"; })(KnowledgeBaseEntryOperationType || (exports.KnowledgeBaseEntryOperationType = KnowledgeBaseEntryOperationType = {})); class KnowledgeBaseService { constructor(dependencies) { (0, _defineProperty2.default)(this, "hasSetup", false); (0, _defineProperty2.default)(this, "_queue", []); (0, _defineProperty2.default)(this, "recall", async ({ user, queries, namespace }) => { try { const response = await this.dependencies.esClient.search({ index: this.dependencies.resources.aliases.kb, query: { bool: { should: queries.map(query => ({ text_expansion: { 'ml.tokens': { model_text: query, model_id: '.elser_model_1' } } })), filter: [...(0, _get_access_query.getAccessQuery)({ user, namespace })] } }, size: 5, _source: { includes: ['text', 'is_correction', 'labels'] } }); return { entries: response.hits.hits.map(hit => ({ ...hit._source, score: hit._score, id: hit._id })) }; } catch (error) { if (isAlreadyExistsError(error)) { throwKnowledgeBaseNotReady(error.body); } throw error; } }); (0, _defineProperty2.default)(this, "summarize", async ({ entry: { id, ...document }, user, namespace }) => { try { await this.dependencies.esClient.index({ index: this.dependencies.resources.aliases.kb, id, document: { '@timestamp': new Date().toISOString(), ...document, user, namespace }, pipeline: this.dependencies.resources.pipelines.kb, refresh: false }); } catch (error) { if (error instanceof _elasticsearch.errors.ResponseError && error.body.error.type === 'status_exception') { throwKnowledgeBaseNotReady(error.body); } throw error; } }); (0, _defineProperty2.default)(this, "status", async () => { try { var _elserModelStats$depl, _elserModelStats$depl2; const modelStats = await this.dependencies.esClient.ml.getTrainedModelsStats({ model_id: ELSER_MODEL_ID }); const elserModelStats = modelStats.trained_model_stats[0]; const deploymentState = (_elserModelStats$depl = elserModelStats.deployment_stats) === null || _elserModelStats$depl === void 0 ? void 0 : _elserModelStats$depl.state; const allocationState = (_elserModelStats$depl2 = elserModelStats.deployment_stats) === null || _elserModelStats$depl2 === void 0 ? void 0 : _elserModelStats$depl2.allocation_status.state; return { ready: deploymentState === 'started' && allocationState === 'fully_allocated', deployment_state: deploymentState, allocation_state: allocationState }; } catch (error) { return { error: error instanceof _elasticsearch.errors.ResponseError ? error.body.error : String(error), ready: false }; } }); (0, _defineProperty2.default)(this, "setup", async () => { const retryOptions = { factor: 1, minTimeout: 10000, retries: 12 }; const installModel = async () => { this.dependencies.logger.info('Installing ELSER model'); await this.dependencies.esClient.ml.putTrainedModel({ model_id: ELSER_MODEL_ID, input: { field_names: ['text_field'] }, // @ts-expect-error wait_for_completion: true }, { requestTimeout: '20m' }); this.dependencies.logger.info('Finished installing ELSER model'); }; const getIsModelInstalled = async () => { var _getResponse$trained_; const getResponse = await this.dependencies.esClient.ml.getTrainedModels({ model_id: ELSER_MODEL_ID, include: 'definition_status' }); this.dependencies.logger.debug('Model definition status:\n' + JSON.stringify(getResponse.trained_model_configs[0])); return Boolean((_getResponse$trained_ = getResponse.trained_model_configs[0]) === null || _getResponse$trained_ === void 0 ? void 0 : _getResponse$trained_.fully_defined); }; await (0, _pRetry.default)(async () => { let isModelInstalled = false; try { isModelInstalled = await getIsModelInstalled(); } catch (error) { if (isAlreadyExistsError(error)) { await installModel(); isModelInstalled = await getIsModelInstalled(); } } if (!isModelInstalled) { throwKnowledgeBaseNotReady({ message: 'Model is not fully defined' }); } }, retryOptions); try { await this.dependencies.esClient.ml.startTrainedModelDeployment({ model_id: ELSER_MODEL_ID, wait_for: 'fully_allocated' }); } catch (error) { this.dependencies.logger.debug('Error starting model deployment'); this.dependencies.logger.debug(error); if (!isAlreadyExistsError(error)) { throw error; } } await (0, _pRetry.default)(async () => { var _response$trained_mod, _response$trained_mod2; const response = await this.dependencies.esClient.ml.getTrainedModelsStats({ model_id: ELSER_MODEL_ID }); if (((_response$trained_mod = response.trained_model_stats[0]) === null || _response$trained_mod === void 0 ? void 0 : (_response$trained_mod2 = _response$trained_mod.deployment_stats) === null || _response$trained_mod2 === void 0 ? void 0 : _response$trained_mod2.allocation_status.state) === 'fully_allocated') { return Promise.resolve(); } this.dependencies.logger.debug('Model is not allocated yet'); this.dependencies.logger.debug(JSON.stringify(response)); throw (0, _boom.gatewayTimeout)(); }, retryOptions); this.dependencies.logger.info('Model is ready'); this.ensureTaskScheduled(); }); this.dependencies = dependencies; this.ensureTaskScheduled(); } ensureTaskScheduled() { this.dependencies.taskManagerStart.ensureScheduled({ taskType: _.INDEX_QUEUED_DOCUMENTS_TASK_TYPE, id: _.INDEX_QUEUED_DOCUMENTS_TASK_ID, state: {}, params: {}, schedule: { interval: '1h' } }).then(() => { this.dependencies.logger.debug('Scheduled queue task'); return this.dependencies.taskManagerStart.runSoon(_.INDEX_QUEUED_DOCUMENTS_TASK_ID); }).then(() => { this.dependencies.logger.debug('Queue task ran'); }).catch(err => { this.dependencies.logger.error(`Failed to schedule queue task`); this.dependencies.logger.error(err); }); } async processOperation(operation) { if (operation.type === KnowledgeBaseEntryOperationType.Delete) { await this.dependencies.esClient.deleteByQuery({ index: this.dependencies.resources.aliases.kb, query: { bool: { filter: [...(operation.id ? [{ term: { _id: operation.id } }] : []), ...(operation.labels ? (0, _lodash.map)(operation.labels, (value, key) => { return { term: { [key]: value } }; }) : [])] } } }); return; } await this.summarize({ entry: operation.document }); } async processQueue() { if (!this._queue.length) { return; } if (!(await this.status()).ready) { this.dependencies.logger.debug(`Bailing on queue task: KB is not ready yet`); return; } this.dependencies.logger.debug(`Processing queue`); this.hasSetup = true; this.dependencies.logger.info(`Processing ${this._queue.length} queue operations`); const limiter = (0, _pLimit.default)(5); const operations = this._queue.concat(); await Promise.all(operations.map(operation => limiter(async () => { this._queue.splice(operations.indexOf(operation), 1); await this.processOperation(operation); }))); this.dependencies.logger.info('Processed all queued operations'); } queue(operations) { if (!operations.length) { return; } if (!this.hasSetup) { this._queue.push(...operations); return; } const limiter = (0, _pLimit.default)(5); const limitedFunctions = this._queue.map(operation => limiter(() => this.processOperation(operation))); Promise.all(limitedFunctions).catch(err => { this.dependencies.logger.error(`Failed to process all queued operations`); this.dependencies.logger.error(err); }); } } exports.KnowledgeBaseService = KnowledgeBaseService;