"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.defineLogRateAnalysisRoute = void 0; var _async = require("async"); var _i18n = require("@kbn/i18n"); var _fieldTypes = require("@kbn/field-types"); var _server = require("@kbn/ml-response-stream/server"); var _mlAggUtils = require("@kbn/ml-agg-utils"); var _mlRouteUtils = require("@kbn/ml-route-utils"); var _constants = require("../../common/constants"); var _log_rate_analysis = require("../../common/api/log_rate_analysis"); var _api = require("../../common/api"); var _common = require("../../common"); var _is_request_aborted_error = require("../lib/is_request_aborted_error"); var _fetch_significant_term_p_values = require("./queries/fetch_significant_term_p_values"); var _fetch_index_info = require("./queries/fetch_index_info"); var _fetch_frequent_item_sets = require("./queries/fetch_frequent_item_sets"); var _get_histogram_query = require("./queries/get_histogram_query"); var _get_group_filter = require("./queries/get_group_filter"); var _get_significant_term_groups = require("./queries/get_significant_term_groups"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ // 10s ping frequency to keep the stream alive. const PING_FREQUENCY = 10000; // Overall progress is a float from 0 to 1. const LOADED_FIELD_CANDIDATES = 0.2; const PROGRESS_STEP_P_VALUES = 0.5; const PROGRESS_STEP_GROUPING = 0.1; const PROGRESS_STEP_HISTOGRAMS = 0.1; const PROGRESS_STEP_HISTOGRAMS_GROUPS = 0.1; const defineLogRateAnalysisRoute = (router, license, logger, coreStart) => { router.versioned.post({ path: _api.AIOPS_API_ENDPOINT.LOG_RATE_ANALYSIS, access: 'internal' }).addVersion({ version: '1', validate: { request: { body: _log_rate_analysis.aiopsLogRateAnalysisSchema } } }, async (context, request, response) => { if (!license.isActivePlatinumLicense) { return response.forbidden(); } const client = (await context.core).elasticsearch.client.asCurrentUser; const executionContext = (0, _mlRouteUtils.createExecutionContext)(coreStart, _common.PLUGIN_ID, request.route.path); return await coreStart.executionContext.withContext(executionContext, () => { var _request$body$sampleP; let logMessageCounter = 1; function logDebugMessage(msg) { logger.debug(`Log Rate Analysis #${logMessageCounter}: ${msg}`); logMessageCounter++; } logDebugMessage('Starting analysis.'); const groupingEnabled = !!request.body.grouping; const sampleProbability = (_request$body$sampleP = request.body.sampleProbability) !== null && _request$body$sampleP !== void 0 ? _request$body$sampleP : 1; const controller = new AbortController(); const abortSignal = controller.signal; let isRunning = false; let loaded = 0; let shouldStop = false; request.events.aborted$.subscribe(() => { logDebugMessage('aborted$ subscription trigger.'); shouldStop = true; controller.abort(); }); request.events.completed$.subscribe(() => { logDebugMessage('completed$ subscription trigger.'); shouldStop = true; controller.abort(); }); const { end: streamEnd, push, responseWithHeaders } = (0, _server.streamFactory)(request.headers, logger, request.body.compressResponse, request.body.flushFix); function pushPingWithTimeout() { setTimeout(() => { if (isRunning) { logDebugMessage('Ping message.'); push((0, _log_rate_analysis.pingAction)()); pushPingWithTimeout(); } }, PING_FREQUENCY); } function end() { if (isRunning) { isRunning = false; logDebugMessage('Ending analysis.'); streamEnd(); } else { logDebugMessage('end() was called again with isRunning already being false.'); } } function endWithUpdatedLoadingState() { push((0, _log_rate_analysis.updateLoadingStateAction)({ ccsWarning: false, loaded: 1, loadingState: _i18n.i18n.translate('xpack.aiops.logRateAnalysis.loadingState.doneMessage', { defaultMessage: 'Done.' }) })); end(); } function pushError(m) { logDebugMessage('Push error.'); push((0, _log_rate_analysis.addErrorAction)(m)); } async function runAnalysis() { try { var _request$body$overrid, _request$body$overrid2, _request$body$overrid5, _request$body$overrid6, _request$body$overrid7, _request$body$overrid8, _request$body$overrid13; isRunning = true; if (!request.body.overrides) { logDebugMessage('Full Reset.'); push((0, _log_rate_analysis.resetAllAction)()); } else { logDebugMessage('Reset Errors.'); push((0, _log_rate_analysis.resetErrorsAction)()); } if ((_request$body$overrid = request.body.overrides) !== null && _request$body$overrid !== void 0 && _request$body$overrid.regroupOnly) { logDebugMessage('Reset Groups.'); push((0, _log_rate_analysis.resetGroupsAction)()); } if ((_request$body$overrid2 = request.body.overrides) !== null && _request$body$overrid2 !== void 0 && _request$body$overrid2.loaded) { var _request$body$overrid3, _request$body$overrid4; logDebugMessage(`Set 'loaded' override to '${(_request$body$overrid3 = request.body.overrides) === null || _request$body$overrid3 === void 0 ? void 0 : _request$body$overrid3.loaded}'.`); loaded = (_request$body$overrid4 = request.body.overrides) === null || _request$body$overrid4 === void 0 ? void 0 : _request$body$overrid4.loaded; } pushPingWithTimeout(); // Step 1: Index Info: Field candidates, total doc count, sample probability const fieldCandidates = []; let fieldCandidatesCount = fieldCandidates.length; let totalDocCount = 0; if (!((_request$body$overrid5 = request.body.overrides) !== null && _request$body$overrid5 !== void 0 && _request$body$overrid5.remainingFieldCandidates)) { logDebugMessage('Fetch index information.'); push((0, _log_rate_analysis.updateLoadingStateAction)({ ccsWarning: false, loaded, loadingState: _i18n.i18n.translate('xpack.aiops.logRateAnalysis.loadingState.loadingIndexInformation', { defaultMessage: 'Loading index information.' }) })); try { const indexInfo = await (0, _fetch_index_info.fetchIndexInfo)(client, request.body, abortSignal); fieldCandidates.push(...indexInfo.fieldCandidates); fieldCandidatesCount = fieldCandidates.length; totalDocCount = indexInfo.totalDocCount; } catch (e) { if (!(0, _is_request_aborted_error.isRequestAbortedError)(e)) { logger.error(`Failed to fetch index information, got: \n${e.toString()}`); pushError(`Failed to fetch index information.`); } end(); return; } logDebugMessage(`Total document count: ${totalDocCount}`); logDebugMessage(`Sample probability: ${sampleProbability}`); loaded += LOADED_FIELD_CANDIDATES; pushPingWithTimeout(); push((0, _log_rate_analysis.updateLoadingStateAction)({ ccsWarning: false, loaded, loadingState: _i18n.i18n.translate('xpack.aiops.logRateAnalysis.loadingState.identifiedFieldCandidates', { defaultMessage: 'Identified {fieldCandidatesCount, plural, one {# field candidate} other {# field candidates}}.', values: { fieldCandidatesCount } }) })); if (fieldCandidatesCount === 0) { endWithUpdatedLoadingState(); } else if (shouldStop) { logDebugMessage('shouldStop after fetching field candidates.'); end(); return; } } // Step 2: Significant Terms const significantTerms = (_request$body$overrid6 = request.body.overrides) !== null && _request$body$overrid6 !== void 0 && _request$body$overrid6.significantTerms ? (_request$body$overrid7 = request.body.overrides) === null || _request$body$overrid7 === void 0 ? void 0 : _request$body$overrid7.significantTerms : []; const fieldsToSample = new Set(); // Don't use more than 10 here otherwise Kibana will emit an error // regarding a limit of abort signal listeners of more than 10. const MAX_CONCURRENT_QUERIES = 10; let remainingFieldCandidates; let loadingStepSizePValues = PROGRESS_STEP_P_VALUES; if ((_request$body$overrid8 = request.body.overrides) !== null && _request$body$overrid8 !== void 0 && _request$body$overrid8.remainingFieldCandidates) { var _request$body$overrid9, _request$body$overrid10, _request$body$overrid11, _request$body$overrid12; fieldCandidates.push(...((_request$body$overrid9 = request.body.overrides) === null || _request$body$overrid9 === void 0 ? void 0 : _request$body$overrid9.remainingFieldCandidates)); remainingFieldCandidates = (_request$body$overrid10 = request.body.overrides) === null || _request$body$overrid10 === void 0 ? void 0 : _request$body$overrid10.remainingFieldCandidates; fieldCandidatesCount = fieldCandidates.length; loadingStepSizePValues = LOADED_FIELD_CANDIDATES + PROGRESS_STEP_P_VALUES - ((_request$body$overrid11 = (_request$body$overrid12 = request.body.overrides) === null || _request$body$overrid12 === void 0 ? void 0 : _request$body$overrid12.loaded) !== null && _request$body$overrid11 !== void 0 ? _request$body$overrid11 : PROGRESS_STEP_P_VALUES); } else { remainingFieldCandidates = fieldCandidates; } logDebugMessage('Fetch p-values.'); const pValuesQueue = (0, _async.queue)(async function (fieldCandidate) { loaded += 1 / fieldCandidatesCount * loadingStepSizePValues; let pValues; try { pValues = await (0, _fetch_significant_term_p_values.fetchSignificantTermPValues)(client, request.body, [fieldCandidate], logger, sampleProbability, pushError, abortSignal); } catch (e) { if (!(0, _is_request_aborted_error.isRequestAbortedError)(e)) { logger.error(`Failed to fetch p-values for '${fieldCandidate}', got: \n${e.toString()}`); pushError(`Failed to fetch p-values for '${fieldCandidate}'.`); } return; } remainingFieldCandidates = remainingFieldCandidates.filter(d => d !== fieldCandidate); if (pValues.length > 0) { pValues.forEach(d => { fieldsToSample.add(d.fieldName); }); significantTerms.push(...pValues); push((0, _log_rate_analysis.addSignificantTermsAction)(pValues)); } push((0, _log_rate_analysis.updateLoadingStateAction)({ ccsWarning: false, loaded, loadingState: _i18n.i18n.translate('xpack.aiops.logRateAnalysis.loadingState.identifiedFieldValuePairs', { defaultMessage: 'Identified {fieldValuePairsCount, plural, one {# significant field/value pair} other {# significant field/value pairs}}.', values: { fieldValuePairsCount: significantTerms.length } }), remainingFieldCandidates })); }, MAX_CONCURRENT_QUERIES); pValuesQueue.push(fieldCandidates, err => { if (err) { logger.error(`Failed to fetch p-values.', got: \n${err.toString()}`); pushError(`Failed to fetch p-values.`); pValuesQueue.kill(); end(); } else if (shouldStop) { logDebugMessage('shouldStop fetching p-values.'); pValuesQueue.kill(); end(); } }); await pValuesQueue.drain(); if (significantTerms.length === 0) { logDebugMessage('Stopping analysis, did not find significant terms.'); endWithUpdatedLoadingState(); return; } const histogramFields = [{ fieldName: request.body.timeFieldName, type: _fieldTypes.KBN_FIELD_TYPES.DATE }]; logDebugMessage('Fetch overall histogram.'); let overallTimeSeries; const overallHistogramQuery = (0, _get_histogram_query.getHistogramQuery)(request.body); try { overallTimeSeries = (await (0, _mlAggUtils.fetchHistogramsForFields)(client, request.body.index, overallHistogramQuery, // fields histogramFields, // samplerShardSize -1, undefined, abortSignal, sampleProbability, _constants.RANDOM_SAMPLER_SEED))[0]; } catch (e) { if (!(0, _is_request_aborted_error.isRequestAbortedError)(e)) { logger.error(`Failed to fetch the overall histogram data, got: \n${e.toString()}`); pushError(`Failed to fetch overall histogram data.`); } // Still continue the analysis even if loading the overall histogram fails. } function pushHistogramDataLoadingState() { push((0, _log_rate_analysis.updateLoadingStateAction)({ ccsWarning: false, loaded, loadingState: _i18n.i18n.translate('xpack.aiops.logRateAnalysis.loadingState.loadingHistogramData', { defaultMessage: 'Loading histogram data.' }) })); } if (shouldStop) { logDebugMessage('shouldStop after fetching overall histogram.'); end(); return; } if (groupingEnabled) { logDebugMessage('Group results.'); push((0, _log_rate_analysis.updateLoadingStateAction)({ ccsWarning: false, loaded, loadingState: _i18n.i18n.translate('xpack.aiops.logRateAnalysis.loadingState.groupingResults', { defaultMessage: 'Transforming significant field/value pairs into groups.' }), groupsMissing: true })); try { const { fields, df } = await (0, _fetch_frequent_item_sets.fetchFrequentItemSets)(client, request.body.index, JSON.parse(request.body.searchQuery), significantTerms, request.body.timeFieldName, request.body.deviationMin, request.body.deviationMax, logger, sampleProbability, pushError, abortSignal); if (shouldStop) { logDebugMessage('shouldStop after fetching frequent_item_sets.'); end(); return; } if (fields.length > 0 && df.length > 0) { const significantTermGroups = (0, _get_significant_term_groups.getSignificantTermGroups)(df, significantTerms, fields); // We'll find out if there's at least one group with at least two items, // only then will we return the groups to the clients and make the grouping option available. const maxItems = Math.max(...significantTermGroups.map(g => g.group.length)); if (maxItems > 1) { push((0, _log_rate_analysis.addSignificantTermsGroupAction)(significantTermGroups)); } loaded += PROGRESS_STEP_GROUPING; pushHistogramDataLoadingState(); if (shouldStop) { logDebugMessage('shouldStop after grouping.'); end(); return; } logDebugMessage(`Fetch ${significantTermGroups.length} group histograms.`); const groupHistogramQueue = (0, _async.queue)(async function (cpg) { if (shouldStop) { logDebugMessage('shouldStop abort fetching group histograms.'); groupHistogramQueue.kill(); end(); return; } if (overallTimeSeries !== undefined) { var _overallTimeSeries$da; const histogramQuery = (0, _get_histogram_query.getHistogramQuery)(request.body, (0, _get_group_filter.getGroupFilter)(cpg)); let cpgTimeSeries; try { cpgTimeSeries = (await (0, _mlAggUtils.fetchHistogramsForFields)(client, request.body.index, histogramQuery, // fields [{ fieldName: request.body.timeFieldName, type: _fieldTypes.KBN_FIELD_TYPES.DATE, interval: overallTimeSeries.interval, min: overallTimeSeries.stats[0], max: overallTimeSeries.stats[1] }], // samplerShardSize -1, undefined, abortSignal, sampleProbability, _constants.RANDOM_SAMPLER_SEED))[0]; } catch (e) { if (!(0, _is_request_aborted_error.isRequestAbortedError)(e)) { logger.error(`Failed to fetch the histogram data for group #${cpg.id}, got: \n${e.toString()}`); pushError(`Failed to fetch the histogram data for group #${cpg.id}.`); } return; } const histogram = (_overallTimeSeries$da = overallTimeSeries.data.map((o, i) => { var _cpgTimeSeries$data$f, _o$key_as_string; const current = (_cpgTimeSeries$data$f = cpgTimeSeries.data.find(d1 => d1.key_as_string === o.key_as_string)) !== null && _cpgTimeSeries$data$f !== void 0 ? _cpgTimeSeries$data$f : { doc_count: 0 }; return { key: o.key, key_as_string: (_o$key_as_string = o.key_as_string) !== null && _o$key_as_string !== void 0 ? _o$key_as_string : '', doc_count_significant_term: current.doc_count, doc_count_overall: Math.max(0, o.doc_count - current.doc_count) }; })) !== null && _overallTimeSeries$da !== void 0 ? _overallTimeSeries$da : []; push((0, _log_rate_analysis.addSignificantTermsGroupHistogramAction)([{ id: cpg.id, histogram }])); } }, MAX_CONCURRENT_QUERIES); groupHistogramQueue.push(significantTermGroups); await groupHistogramQueue.drain(); } } catch (e) { if (!(0, _is_request_aborted_error.isRequestAbortedError)(e)) { logger.error(`Failed to transform field/value pairs into groups, got: \n${e.toString()}`); pushError(`Failed to transform field/value pairs into groups.`); } } } loaded += PROGRESS_STEP_HISTOGRAMS_GROUPS; logDebugMessage(`Fetch ${significantTerms.length} field/value histograms.`); // time series filtered by fields if (significantTerms.length > 0 && overallTimeSeries !== undefined && !((_request$body$overrid13 = request.body.overrides) !== null && _request$body$overrid13 !== void 0 && _request$body$overrid13.regroupOnly)) { const fieldValueHistogramQueue = (0, _async.queue)(async function (cp) { if (shouldStop) { logDebugMessage('shouldStop abort fetching field/value histograms.'); fieldValueHistogramQueue.kill(); end(); return; } if (overallTimeSeries !== undefined) { var _overallTimeSeries$da2; const histogramQuery = (0, _get_histogram_query.getHistogramQuery)(request.body, [{ term: { [cp.fieldName]: cp.fieldValue } }]); let cpTimeSeries; try { cpTimeSeries = (await (0, _mlAggUtils.fetchHistogramsForFields)(client, request.body.index, histogramQuery, // fields [{ fieldName: request.body.timeFieldName, type: _fieldTypes.KBN_FIELD_TYPES.DATE, interval: overallTimeSeries.interval, min: overallTimeSeries.stats[0], max: overallTimeSeries.stats[1] }], // samplerShardSize -1, undefined, abortSignal, sampleProbability, _constants.RANDOM_SAMPLER_SEED))[0]; } catch (e) { logger.error(`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}", got: \n${e.toString()}`); pushError(`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".`); return; } const histogram = (_overallTimeSeries$da2 = overallTimeSeries.data.map((o, i) => { var _cpTimeSeries$data$fi, _o$key_as_string2; const current = (_cpTimeSeries$data$fi = cpTimeSeries.data.find(d1 => d1.key_as_string === o.key_as_string)) !== null && _cpTimeSeries$data$fi !== void 0 ? _cpTimeSeries$data$fi : { doc_count: 0 }; return { key: o.key, key_as_string: (_o$key_as_string2 = o.key_as_string) !== null && _o$key_as_string2 !== void 0 ? _o$key_as_string2 : '', doc_count_significant_term: current.doc_count, doc_count_overall: Math.max(0, o.doc_count - current.doc_count) }; })) !== null && _overallTimeSeries$da2 !== void 0 ? _overallTimeSeries$da2 : []; const { fieldName, fieldValue } = cp; loaded += 1 / significantTerms.length * PROGRESS_STEP_HISTOGRAMS; pushHistogramDataLoadingState(); push((0, _log_rate_analysis.addSignificantTermsHistogramAction)([{ fieldName, fieldValue, histogram }])); } }, MAX_CONCURRENT_QUERIES); fieldValueHistogramQueue.push(significantTerms); await fieldValueHistogramQueue.drain(); } endWithUpdatedLoadingState(); } catch (e) { if (!(0, _is_request_aborted_error.isRequestAbortedError)(e)) { logger.error(`Log Rate Analysis failed to finish, got: \n${e.toString()}`); pushError(`Log Rate Analysis failed to finish.`); } end(); } } // Do not call this using `await` so it will run asynchronously while we return the stream already. runAnalysis(); return response.ok(responseWithHeaders); }); }); }; exports.defineLogRateAnalysisRoute = defineLogRateAnalysisRoute;