"use strict"; var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); Object.defineProperty(exports, "__esModule", { value: true }); exports.PollingErrorType = exports.PollingError = void 0; exports.createTaskPoller = createTaskPoller; var _defineProperty2 = _interopRequireDefault(require("@babel/runtime/helpers/defineProperty")); var _rxjs = require("rxjs"); var _Option = require("fp-ts/lib/Option"); var _result_type = require("../lib/result_type"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ /* * This module contains the logic for polling the task manager index for new work. */ /** * constructs a new TaskPoller stream, which emits events on demand and on a scheduled interval, waiting for capacity to be available before emitting more events. * * @param opts * @prop {number} pollInterval - How often, in milliseconds, we will an event be emnitted, assuming there's capacity to do so * @prop {() => number} getCapacity - A function specifying whether there is capacity to emit new events * @prop {() => Promise} work - The worker we wish to execute in order to `poll` * * @returns {Observable>} - An observable which emits an event whenever a polling event is due to take place, providing access to a singleton Set representing a queue * of unique request argumets of type T. */ function createTaskPoller({ logger, initialPollInterval, pollInterval$, pollIntervalDelay$, getCapacity, work }) { const hasCapacity = () => getCapacity() > 0; let running = false; let timeoutId = null; let hasSubscribed = false; let pollInterval = initialPollInterval; let pollIntervalDelay = 0; const subject = new _rxjs.Subject(); async function runCycle() { timeoutId = null; const start = Date.now(); if (hasCapacity()) { try { const result = await work(); subject.next((0, _result_type.asOk)(result)); } catch (e) { subject.next(asPollingError(e, PollingErrorType.WorkError)); } } if (running) { // Set the next runCycle call timeoutId = setTimeout(runCycle, Math.max(pollInterval - (Date.now() - start) + pollIntervalDelay % pollInterval, 0)); // Reset delay, it's designed to shuffle only once pollIntervalDelay = 0; } } function subscribe() { if (hasSubscribed) { return; } pollInterval$.subscribe(interval => { pollInterval = interval; logger.debug(`Task poller now using interval of ${interval}ms`); }); pollIntervalDelay$.subscribe(delay => { pollIntervalDelay = delay; logger.debug(`Task poller now delaying emission by ${delay}ms`); }); hasSubscribed = true; } return { events$: subject, start: () => { if (!running) { running = true; runCycle(); // We need to subscribe shortly after start. Otherwise, the observables start emiting events // too soon for the task run statistics module to capture. setTimeout(() => subscribe(), 0); } }, stop: () => { if (timeoutId) { clearTimeout(timeoutId); timeoutId = null; } running = false; } }; } let PollingErrorType; exports.PollingErrorType = PollingErrorType; (function (PollingErrorType) { PollingErrorType[PollingErrorType["WorkError"] = 0] = "WorkError"; PollingErrorType[PollingErrorType["WorkTimeout"] = 1] = "WorkTimeout"; PollingErrorType[PollingErrorType["RequestCapacityReached"] = 2] = "RequestCapacityReached"; })(PollingErrorType || (exports.PollingErrorType = PollingErrorType = {})); function asPollingError(err, type, data = _Option.none) { return (0, _result_type.asErr)(new PollingError(`Failed to poll for work: ${err}`, type, data)); } class PollingError extends Error { constructor(message, type, data) { super(message); (0, _defineProperty2.default)(this, "type", void 0); (0, _defineProperty2.default)(this, "data", void 0); Object.setPrototypeOf(this, new.target.prototype); this.type = type; this.data = data; } } exports.PollingError = PollingError;