"use strict"; var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); Object.defineProperty(exports, "__esModule", { value: true }); exports.createBackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator; exports.summarizeUtilizationStats = summarizeUtilizationStats; var _lodash = require("lodash"); var _statsLite = _interopRequireDefault(require("stats-lite")); var _rxjs = require("rxjs"); var _result_type = require("../lib/result_type"); var _task_events = require("../task_events"); var _task_run_calcultors = require("./task_run_calcultors"); var _config = require("../config"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ function createBackgroundTaskUtilizationAggregator(taskPollingLifecycle, adHocTaskCounter, pollInterval, workerUtilizationRunningAverageWindowSize = _config.DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW) { const taskRunEventToAdhocStat = createTaskRunEventToAdhocStat(); const taskRunAdhocEvents$ = taskPollingLifecycle.events.pipe((0, _rxjs.filter)(taskEvent => (0, _task_events.isTaskRunEvent)(taskEvent) && hasTiming(taskEvent)), (0, _rxjs.map)(taskEvent => ({ taskEvent, ...(0, _result_type.unwrap)(taskEvent.event) })), (0, _rxjs.filter)(({ task }) => (0, _lodash.get)(task, 'schedule.interval', null) == null), (0, _rxjs.map)(({ taskEvent }) => { return taskRunEventToAdhocStat(taskEvent.timing, adHocTaskCounter, pollInterval); })); const taskRunEventToRecurringStat = createTaskRunEventToRecurringStat(); const taskRunRecurringEvents$ = taskPollingLifecycle.events.pipe((0, _rxjs.filter)(taskEvent => (0, _task_events.isTaskRunEvent)(taskEvent) && hasTiming(taskEvent)), (0, _rxjs.map)(taskEvent => ({ taskEvent, ...(0, _result_type.unwrap)(taskEvent.event) })), (0, _rxjs.filter)(({ task }) => (0, _lodash.get)(task, 'schedule.interval', null) != null), (0, _rxjs.map)(({ taskEvent, task }) => { return taskRunEventToRecurringStat(taskEvent.timing, task, pollInterval); })); const taskManagerUtilizationEventToLoadStat = createTaskRunEventToLoadStat(workerUtilizationRunningAverageWindowSize); const taskManagerWorkerUtilizationEvent$ = taskPollingLifecycle.events.pipe((0, _rxjs.filter)(_task_events.isTaskManagerWorkerUtilizationStatEvent), (0, _rxjs.map)(taskEvent => taskEvent.event), (0, _rxjs.map)((0, _result_type.mapOk)(num => taskManagerUtilizationEventToLoadStat(num)))); return (0, _rxjs.combineLatest)([taskRunAdhocEvents$.pipe((0, _rxjs.startWith)({ adhoc: { created: { counter: 0 }, ran: { service_time: { actual: 0, adjusted: 0, task_counter: 0 } } } })), taskRunRecurringEvents$.pipe((0, _rxjs.startWith)({ recurring: { ran: { service_time: { actual: 0, adjusted: 0, task_counter: 0 } } } })), taskManagerWorkerUtilizationEvent$.pipe((0, _rxjs.startWith)({ load: 0 }))]).pipe((0, _rxjs.map)(([adhoc, recurring, load]) => { return { key: 'utilization', value: { ...adhoc, ...recurring, ...load } }; })); } function hasTiming(taskEvent) { return !!(taskEvent !== null && taskEvent !== void 0 && taskEvent.timing); } function summarizeUtilizationStats({ lastUpdate, monitoredStats, isInternal }) { const utilizationStats = monitoredStats === null || monitoredStats === void 0 ? void 0 : monitoredStats.value; if (!monitoredStats || !utilizationStats) { return { last_update: lastUpdate, stats: null }; } return { last_update: lastUpdate, stats: { timestamp: monitoredStats.timestamp, value: isInternal ? utilizationStats : (0, _lodash.pick)(utilizationStats, 'load') } }; } function createTaskRunEventToAdhocStat() { let createdCounter = 0; let actualCounter = 0; let adjustedCounter = 0; let taskCounter = 0; return (timing, adHocTaskCounter, pollInterval) => { const { duration, adjusted } = getServiceTimeStats(timing, pollInterval); const created = adHocTaskCounter.count; adHocTaskCounter.reset(); return { adhoc: { created: { counter: createdCounter += created }, ran: { service_time: { actual: actualCounter += duration, adjusted: adjustedCounter += adjusted, task_counter: taskCounter += 1 } } } }; }; } function createTaskRunEventToRecurringStat() { let actualCounter = 0; let adjustedCounter = 0; let taskCounter = 0; return (timing, task, pollInterval) => { const { duration, adjusted } = getServiceTimeStats(timing, pollInterval); return { recurring: { ran: { service_time: { actual: actualCounter += duration, adjusted: adjustedCounter += adjusted, task_counter: taskCounter += 1 } } } }; }; } function createTaskRunEventToLoadStat(workerUtilizationRunningAverageWindowSize) { const loadQueue = (0, _task_run_calcultors.createRunningAveragedStat)(workerUtilizationRunningAverageWindowSize); return load => { const historicalLoad = loadQueue(load); return { load: _statsLite.default.mean(historicalLoad) }; }; } function getServiceTimeStats(timing, pollInterval) { const duration = timing.stop - timing.start; const adjusted = Math.ceil(duration / pollInterval) * pollInterval; return { duration, adjusted }; }