"use strict"; var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); Object.defineProperty(exports, "__esModule", { value: true }); exports.EventStreamService = void 0; var _defineProperty2 = _interopRequireDefault(require("@babel/runtime/helpers/defineProperty")); var _classPrivateMethodGet2 = _interopRequireDefault(require("@babel/runtime/helpers/classPrivateMethodGet")); var _classPrivateFieldGet2 = _interopRequireDefault(require("@babel/runtime/helpers/classPrivateFieldGet")); var _classPrivateFieldSet2 = _interopRequireDefault(require("@babel/runtime/helpers/classPrivateFieldSet")); var _common = require("@kbn/bfetch-plugin/common"); var _validation = require("./validation"); function _classPrivateMethodInitSpec(obj, privateSet) { _checkPrivateRedeclaration(obj, privateSet); privateSet.add(obj); } function _classPrivateFieldInitSpec(obj, privateMap, value) { _checkPrivateRedeclaration(obj, privateMap); privateMap.set(obj, value); } function _checkPrivateRedeclaration(obj, privateCollection) { if (privateCollection.has(obj)) { throw new TypeError("Cannot initialize the same private elements twice on an object"); } } var _buffer = /*#__PURE__*/new WeakMap(); var _getClient = /*#__PURE__*/new WeakSet(); class EventStreamService { constructor(ctx) { _classPrivateMethodInitSpec(this, _getClient); (0, _defineProperty2.default)(this, "client", void 0); _classPrivateFieldInitSpec(this, _buffer, { writable: true, value: void 0 }); this.ctx = ctx; (0, _classPrivateFieldSet2.default)(this, _buffer, new _common.TimedItemBuffer({ flushOnMaxItems: 100, maxItemAge: 250, onFlush: async events => { const { logger } = this.ctx; if (!this.client) { logger.error('EventStreamClient is not initialized, events will not be written.'); return; } try { await this.client.writeEvents(events); } catch (error) { logger.error('Failed to write events to Event Stream.'); logger.error(error); } } })); } /** Called during "setup" plugin life-cycle. */ setup({ core }) { this.client = this.ctx.clientFactory.create(core); } /** Called during "start" plugin life-cycle. */ start() { const { logger } = this.ctx; if (!this.client) throw new Error('EventStreamClient not initialized.'); logger.debug('Initializing Event Stream.'); this.client.initialize().then(() => { logger.debug('Event Stream was initialized.'); }).catch(error => { logger.error('Failed to initialize Event Stream. Events will not be indexed.'); logger.error(error); }); } /** Called during "stop" plugin life-cycle. */ async stop() { await (0, _classPrivateFieldGet2.default)(this, _buffer).flushAsync(); } /** * Validates a single event. Throws an error if the event is invalid. * * @param event A partial event to validate. */ validatePartialEvent(event) { (0, _validation.partialEventValidator)(event); if (_validation.partialEventValidator.errors) { const error = _validation.partialEventValidator.errors[0]; if (!error) throw new Error('Validation failed.'); throw new Error(`Validation error at [path = ${error.instancePath}]: ${error.message}`); } } /** * Queues an event to be written to the Event Stream. The event is appended to * a buffer and written to the Event Stream periodically. * * Events are flushed once the buffer reaches 100 items or 250ms has passed, * whichever comes first. To force a flush, call `.flush()`. * * @param event Event to add to the Event Stream. */ addEvent(event) { this.validatePartialEvent(event); const completeEvent = { ...event, time: event.time || Date.now() }; (0, _classPrivateFieldGet2.default)(this, _buffer).write(completeEvent); } /** * Same as `.addEvent()` but accepts an array of events. * * @param events Events to add to the Event Stream. */ addEvents(events) { for (const event of events) { this.addEvent(event); } } /** * Flushes the event buffer, writing all events to the Event Stream. */ flush() { (0, _classPrivateFieldGet2.default)(this, _buffer).flush(); } /** * Read latest events from the Event Stream. * * @param limit Number of events to return. Defaults to 100. * @returns Latest events from the Event Stream. */ async tail(limit = 100) { const client = (0, _classPrivateMethodGet2.default)(this, _getClient, _getClient2).call(this); return await client.tail(limit); } /** * Retrieves events from the Event Stream which match the specified filter * options. * * @param options Filtering options. * @returns Paginated results of events matching the filter. */ async filter(options) { const client = (0, _classPrivateMethodGet2.default)(this, _getClient, _getClient2).call(this); return await client.filter(options); } } exports.EventStreamService = EventStreamService; function _getClient2() { if (!this.client) throw new Error('EventStreamClient not initialized.'); return this.client; }