"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.createStreamingBatchedFunction = void 0; var _public = require("@kbn/kibana-utils-plugin/public"); var _common = require("../../common"); var _streaming = require("../streaming"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the Server Side Public License, v 1; you may not use this file except * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ /** * Returns a function that does not execute immediately but buffers the call internally until * `params.flushOnMaxItems` is reached or after `params.maxItemAge` timeout in milliseconds is reached. Once * one of those thresholds is reached all buffered calls are sent in one batch to the * server using `params.fetchStreaming` in a POST request. Responses are streamed back * and each batch item is resolved once corresponding response is received. */ const createStreamingBatchedFunction = params => { const { url, fetchStreaming: fetchStreamingInjected = _streaming.fetchStreaming, flushOnMaxItems = 25, maxItemAge = 10, getIsCompressionDisabled = () => false } = params; const [fn] = (0, _common.createBatchedFunction)({ onCall: (payload, signal) => { const future = (0, _public.defer)(); const entry = { payload, future, signal }; return [future.promise, entry]; }, onBatch: async items => { try { // Filter out any items whose signal is already aborted items = items.filter(item => { var _item$signal, _item$signal2; if ((_item$signal = item.signal) !== null && _item$signal !== void 0 && _item$signal.aborted) item.future.reject(new _public.AbortError()); return !((_item$signal2 = item.signal) !== null && _item$signal2 !== void 0 && _item$signal2.aborted); }); if (items.length === 0) { return; // all items have been aborted before a request has been sent } const donePromises = items.map(item => { return new Promise(resolve => { const { promise: abortPromise, cleanup } = item.signal ? (0, _public.abortSignalToPromise)(item.signal) : { promise: undefined, cleanup: () => {} }; const onDone = () => { resolve(); cleanup(); }; if (abortPromise) abortPromise.catch(() => { item.future.reject(new _public.AbortError()); onDone(); }); item.future.promise.then(onDone, onDone); }); }); // abort when all items were either resolved, rejected or aborted const abortController = new AbortController(); let isBatchDone = false; Promise.all(donePromises).then(() => { isBatchDone = true; abortController.abort(); }); const batch = items.map(item => item.payload); const { stream } = fetchStreamingInjected({ url, body: JSON.stringify({ batch }), method: 'POST', signal: abortController.signal, getIsCompressionDisabled }); const handleStreamError = error => { const normalizedError = (0, _common.normalizeError)(error); normalizedError.code = 'STREAM'; for (const { future } of items) future.reject(normalizedError); }; stream.subscribe({ next: json => { try { const response = JSON.parse(json); if (response.error) { items[response.id].future.reject(response.error); } else if (response.result !== undefined) { items[response.id].future.resolve(response.result); } } catch (e) { handleStreamError(e); } }, error: handleStreamError, complete: () => { if (!isBatchDone) { const error = { message: 'Connection terminated prematurely.', code: 'CONNECTION' }; for (const { future } of items) future.reject(error); } } }); await stream.toPromise(); } catch (error) { for (const item of items) item.future.reject(error); } }, flushOnMaxItems, maxItemAge }); return fn; }; exports.createStreamingBatchedFunction = createStreamingBatchedFunction;