"use strict"; var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); Object.defineProperty(exports, "__esModule", { value: true }); exports.ElasticV3ServerShipper = void 0; var _defineProperty2 = _interopRequireDefault(require("@babel/runtime/helpers/defineProperty")); var _nodeFetch = _interopRequireDefault(require("node-fetch")); var _rxjs = require("rxjs"); var _analyticsShippersElasticV3Common = require("@kbn/analytics-shippers-elastic-v3-common"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the Server Side Public License, v 1; you may not use this file except * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ const SECOND = 1000; const MINUTE = 60 * SECOND; const HOUR = 60 * MINUTE; const KIB = 1024; const MAX_NUMBER_OF_EVENTS_IN_INTERNAL_QUEUE = 1000; const MIN_TIME_SINCE_LAST_SEND = 10 * SECOND; /** * Elastic V3 shipper to use on the server side. */ class ElasticV3ServerShipper { /** Shipper's unique name */ /** Observable to emit the stats of the processed events. */ /** * Specifies when it went offline: * - `undefined` means it doesn't know yet whether it is online or offline * - `null` means it's online * - `number` means it's offline since that time * @private */ /** * Creates a new instance of the {@link ElasticV3ServerShipper}. * @param options {@link ElasticV3ShipperOptions} * @param initContext {@link AnalyticsClientInitContext} */ constructor(options, initContext) { var _options$sendTo; (0, _defineProperty2.default)(this, "telemetryCounter$", new _rxjs.Subject()); (0, _defineProperty2.default)(this, "reportTelemetryCounters", (0, _analyticsShippersElasticV3Common.createTelemetryCounterHelper)(this.telemetryCounter$, ElasticV3ServerShipper.shipperName)); (0, _defineProperty2.default)(this, "internalQueue", []); (0, _defineProperty2.default)(this, "shutdown$", new _rxjs.ReplaySubject(1)); (0, _defineProperty2.default)(this, "flush$", new _rxjs.Subject()); (0, _defineProperty2.default)(this, "inFlightRequests$", new _rxjs.BehaviorSubject(0)); (0, _defineProperty2.default)(this, "isOptedIn$", new _rxjs.BehaviorSubject(undefined)); (0, _defineProperty2.default)(this, "url", void 0); (0, _defineProperty2.default)(this, "lastBatchSent", Date.now()); (0, _defineProperty2.default)(this, "clusterUuid", 'UNKNOWN'); (0, _defineProperty2.default)(this, "licenseId", void 0); (0, _defineProperty2.default)(this, "firstTimeOffline", void 0); this.options = options; this.initContext = initContext; this.url = (0, _analyticsShippersElasticV3Common.buildUrl)({ sendTo: (_options$sendTo = options.sendTo) !== null && _options$sendTo !== void 0 ? _options$sendTo : initContext.sendTo, channelName: options.channelName }); this.setInternalSubscriber(); this.checkConnectivity(); } /** * Uses the `cluster_uuid` and `license_id` from the context to hold them in memory for the generation of the headers * used later on in the HTTP request. * @param newContext The full new context to set {@link EventContext} */ extendContext(newContext) { if (newContext.cluster_uuid) { this.clusterUuid = newContext.cluster_uuid; } if (newContext.license_id) { this.licenseId = newContext.license_id; } } /** * When `false`, it flushes the internal queue and stops sending events. * @param isOptedIn `true` for resume sending events. `false` to stop. */ optIn(isOptedIn) { this.isOptedIn$.next(isOptedIn); if (isOptedIn === false) { this.internalQueue.length = 0; } } /** * Enqueues the events to be sent via the leaky bucket algorithm. * @param events batched events {@link Event} */ reportEvents(events) { // If opted out OR offline for longer than 24 hours, skip processing any events. if (this.isOptedIn$.value === false || this.firstTimeOffline && Date.now() - this.firstTimeOffline > 24 * HOUR) { return; } const freeSpace = MAX_NUMBER_OF_EVENTS_IN_INTERNAL_QUEUE - this.internalQueue.length; // As per design, we only want store up-to 1000 events at a time. Drop anything that goes beyond that limit if (freeSpace < events.length) { const toDrop = events.length - freeSpace; const droppedEvents = events.splice(-toDrop, toDrop); this.reportTelemetryCounters(droppedEvents, { type: 'dropped', code: 'queue_full' }); } this.internalQueue.push(...events); } /** * Triggers a flush of the internal queue to attempt to send any events held in the queue * and resolves the returned promise once the queue is emptied. */ async flush() { if (this.flush$.isStopped) { // If called after shutdown, return straight away return; } const promise = (0, _rxjs.firstValueFrom)(this.inFlightRequests$.pipe((0, _rxjs.skip)(1), // Skipping the first value because BehaviourSubjects always emit the current value on subscribe. (0, _rxjs.filter)(count => count === 0) // Wait until all the inflight requests are completed. )); this.flush$.next(); await promise; } /** * Shuts down the shipper. * Triggers a flush of the internal queue to attempt to send any events held in the queue. */ shutdown() { this.shutdown$.next(); this.flush$.complete(); this.shutdown$.complete(); this.isOptedIn$.complete(); } /** * Checks the server has connectivity to the remote endpoint. * The frequency of the connectivity tests will back off, starting with 1 minute, and multiplying by 2 * until it reaches 1 hour. Then, it’ll keep the 1h frequency until it reaches 24h without connectivity. * At that point, it clears the queue and stops accepting events in the queue. * The connectivity checks will continue to happen every 1 hour just in case it regains it at any point. * @private */ checkConnectivity() { let backoff = 1 * MINUTE; (0, _rxjs.merge)((0, _rxjs.timer)(0, 1 * MINUTE), // Also react to opt-in changes to avoid being stalled for 1 minute for the first connectivity check. // More details in: https://github.com/elastic/kibana/issues/135647 this.isOptedIn$).pipe((0, _rxjs.takeUntil)(this.shutdown$), (0, _rxjs.filter)(() => this.isOptedIn$.value === true && this.firstTimeOffline !== null), // Using exhaustMap here because one request at a time is enough to check the connectivity. (0, _rxjs.exhaustMap)(async () => { const { ok } = await (0, _nodeFetch.default)(this.url, { method: 'OPTIONS' }); if (!ok) { throw new Error(`Failed to connect to ${this.url}`); } this.firstTimeOffline = null; backoff = 1 * MINUTE; }), (0, _rxjs.retryWhen)(errors => errors.pipe((0, _rxjs.takeUntil)(this.shutdown$), (0, _rxjs.tap)(() => { if (!this.firstTimeOffline) { this.firstTimeOffline = Date.now(); } else if (Date.now() - this.firstTimeOffline > 24 * HOUR) { this.internalQueue.length = 0; } backoff = backoff * 2; if (backoff > 1 * HOUR) { backoff = 1 * HOUR; } }), (0, _rxjs.delayWhen)(() => (0, _rxjs.timer)(backoff))))).subscribe(); } setInternalSubscriber() { // Create an emitter that emits when MIN_TIME_SINCE_LAST_SEND have passed since the last time we sent the data const minimumTimeSinceLastSent$ = (0, _rxjs.interval)(SECOND).pipe((0, _rxjs.filter)(() => Date.now() - this.lastBatchSent >= MIN_TIME_SINCE_LAST_SEND)); (0, _rxjs.merge)(minimumTimeSinceLastSent$.pipe((0, _rxjs.takeUntil)(this.shutdown$), (0, _rxjs.map)(() => ({ shouldFlush: false }))), // Whenever a `flush` request comes in this.flush$.pipe((0, _rxjs.map)(() => ({ shouldFlush: true }))), // Attempt to send one last time on shutdown, flushing the queue this.shutdown$.pipe((0, _rxjs.map)(() => ({ shouldFlush: true })))).pipe( // Only move ahead if it's opted-in and online, and there are some events in the queue (0, _rxjs.filter)(() => { const shouldSendAnything = this.isOptedIn$.value === true && this.firstTimeOffline === null && this.internalQueue.length > 0; // If it should not send anything, re-emit the inflight request observable just in case it's already 0 if (!shouldSendAnything) { this.inFlightRequests$.next(this.inFlightRequests$.value); } return shouldSendAnything; }), // Send the events: // 1. Set lastBatchSent and retrieve the events to send (clearing the queue) in a synchronous operation to avoid race conditions. (0, _rxjs.map)(({ shouldFlush }) => { this.lastBatchSent = Date.now(); return this.getEventsToSend(shouldFlush); }), // 2. Skip empty buffers (just to be sure) (0, _rxjs.filter)(events => events.length > 0), // 3. Actually send the events // Using `mergeMap` here because we want to send events whenever the emitter says so: // We don't want to skip emissions (exhaustMap) or enqueue them (concatMap). (0, _rxjs.mergeMap)(eventsToSend => this.sendEvents(eventsToSend))).subscribe(); } /** * Calculates the size of the queue in bytes. * @returns The number of bytes held in the queue. * @private */ getQueueByteSize(queue) { return queue.reduce((acc, event) => { return acc + this.getEventSize(event); }, 0); } /** * Calculates the size of the event in bytes. * @param event The event to calculate the size of. * @returns The number of bytes held in the event. * @private */ getEventSize(event) { return Buffer.from(JSON.stringify(event)).length; } /** * Returns a queue of events of up-to 10kB. Or all events in the queue if it's a FLUSH action. * @remarks It mutates the internal queue by removing from it the events returned by this method. * @private */ getEventsToSend(shouldFlush) { // If the internal queue is already smaller than the minimum batch size, or it's a flush action, do a direct assignment. if (shouldFlush || this.getQueueByteSize(this.internalQueue) < 10 * KIB) { return this.internalQueue.splice(0, this.internalQueue.length); } // Otherwise, we'll feed the events to the leaky bucket queue until we reach 10kB. const queue = []; let queueByteSize = 0; while (queueByteSize < 10 * KIB) { const event = this.internalQueue.shift(); queueByteSize += this.getEventSize(event); queue.push(event); } return queue; } async sendEvents(events) { this.initContext.logger.debug(`Reporting ${events.length} events...`); this.inFlightRequests$.next(this.inFlightRequests$.value + 1); try { const code = await this.makeRequest(events); this.reportTelemetryCounters(events, { code }); this.initContext.logger.debug(`Reported ${events.length} events...`); } catch (error) { this.initContext.logger.debug(`Failed to report ${events.length} events...`); this.initContext.logger.debug(error); this.reportTelemetryCounters(events, { code: error.code, error }); this.firstTimeOffline = undefined; } this.inFlightRequests$.next(Math.max(0, this.inFlightRequests$.value - 1)); } async makeRequest(events) { const response = await (0, _nodeFetch.default)(this.url, { method: 'POST', body: (0, _analyticsShippersElasticV3Common.eventsToNDJSON)(events), headers: (0, _analyticsShippersElasticV3Common.buildHeaders)(this.clusterUuid, this.options.version, this.licenseId), ...(this.options.debug && { query: { debug: true } }) }); if (this.options.debug) { this.initContext.logger.debug(`${response.status} - ${await response.text()}`); } if (!response.ok) { throw new _analyticsShippersElasticV3Common.ErrorWithCode(`${response.status} - ${await response.text()}`, `${response.status}`); } return `${response.status}`; } } exports.ElasticV3ServerShipper = ElasticV3ServerShipper; (0, _defineProperty2.default)(ElasticV3ServerShipper, "shipperName", 'elastic_v3_server');