"use strict"; var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); Object.defineProperty(exports, "__esModule", { value: true }); exports.AnalyticsClient = void 0; var _defineProperty2 = _interopRequireDefault(require("@babel/runtime/helpers/defineProperty")); var _rxjs = require("rxjs"); var _operators = require("rxjs/operators"); var _shippers_registry = require("./shippers_registry"); var _opt_in_config = require("./opt_in_config"); var _context_service = require("./context_service"); var _validation = require("../schema/validation"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the Server Side Public License, v 1; you may not use this file except * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ class AnalyticsClient { // Using `share` so we can have multiple subscribers /** * This queue holds all the events until both conditions occur: * 1. We know the user's optIn decision. * 2. We have, at least, one registered shipper. * @private */ /** * Observable used to report when a shipper is registered. * @private */ constructor(initContext) { (0, _defineProperty2.default)(this, "internalTelemetryCounter$", new _rxjs.Subject()); (0, _defineProperty2.default)(this, "telemetryCounter$", this.internalTelemetryCounter$.pipe((0, _operators.share)())); (0, _defineProperty2.default)(this, "internalEventQueue$", new _rxjs.Subject()); (0, _defineProperty2.default)(this, "shippersRegistry", new _shippers_registry.ShippersRegistry()); (0, _defineProperty2.default)(this, "shipperRegistered$", new _rxjs.Subject()); (0, _defineProperty2.default)(this, "eventTypeRegistry", new Map()); (0, _defineProperty2.default)(this, "contextService", void 0); (0, _defineProperty2.default)(this, "context$", new _rxjs.BehaviorSubject({})); (0, _defineProperty2.default)(this, "optInConfig$", new _rxjs.BehaviorSubject(undefined)); (0, _defineProperty2.default)(this, "optInConfigWithReplay$", this.optInConfig$.pipe((0, _operators.filter)(optInConfig => typeof optInConfig !== 'undefined'), (0, _operators.shareReplay)(1))); (0, _defineProperty2.default)(this, "contextWithReplay$", this.context$.pipe((0, _operators.skipWhile)(() => !this.optInConfig$.value), // Do not forward the context events until we have an optInConfig value (0, _operators.shareReplay)(1))); (0, _defineProperty2.default)(this, "reportEvent", (eventType, eventData) => { // Fetch the timestamp as soon as we receive the event. const timestamp = new Date().toISOString(); this.internalTelemetryCounter$.next({ type: 'enqueued', source: 'client', event_type: eventType, code: 'enqueued', count: 1 }); const eventTypeOpts = this.eventTypeRegistry.get(eventType); if (!eventTypeOpts) { this.internalTelemetryCounter$.next({ type: 'dropped', source: 'client', event_type: eventType, code: 'UnregisteredType', count: 1 }); throw new Error(`Attempted to report event type "${eventType}", before registering it. Use the "registerEventType" API to register it.`); } // If the validator is registered (dev-mode only), perform the validation. if (eventTypeOpts.validator) { (0, _validation.validateSchema)(`Event Type '${eventType}'`, eventTypeOpts.validator, eventData); } const event = { timestamp, event_type: eventType, context: this.context$.value, properties: eventData }; // debug-logging before checking the opt-in status to help during development if (this.initContext.isDev) { this.initContext.logger.debug(`Report event "${eventType}"`, { ebt_event: event }); } const optInConfig = this.optInConfig$.value; if ((optInConfig === null || optInConfig === void 0 ? void 0 : optInConfig.isEventTypeOptedIn(eventType)) === false) { // If opted out, skip early return; } if (typeof optInConfig === 'undefined') { // If the opt-in config is not provided yet, we need to enqueue the event to an internal queue this.internalEventQueue$.next(event); } else { this.sendToShipper(eventType, [event]); } }); (0, _defineProperty2.default)(this, "registerEventType", eventTypeOps => { if (this.eventTypeRegistry.get(eventTypeOps.eventType)) { throw new Error(`Event Type "${eventTypeOps.eventType}" is already registered.`); } this.eventTypeRegistry.set(eventTypeOps.eventType, { ...eventTypeOps, validator: this.initContext.isDev ? (0, _validation.schemaToIoTs)(eventTypeOps.schema) : undefined }); }); (0, _defineProperty2.default)(this, "optIn", optInConfig => { const optInConfigInstance = new _opt_in_config.OptInConfigService(optInConfig); this.optInConfig$.next(optInConfigInstance); }); (0, _defineProperty2.default)(this, "registerContextProvider", contextProviderOpts => { this.contextService.registerContextProvider(contextProviderOpts); }); (0, _defineProperty2.default)(this, "removeContextProvider", name => { this.contextService.removeContextProvider(name); }); (0, _defineProperty2.default)(this, "registerShipper", (ShipperClass, shipperConfig, { exclusiveEventTypes = [] } = {}) => { var _shipper$telemetryCou; const shipperName = ShipperClass.shipperName; const shipper = new ShipperClass(shipperConfig, { ...this.initContext, logger: this.initContext.logger.get('shipper', shipperName) }); if (exclusiveEventTypes.length) { // This feature is not intended to be supported in the MVP. // I can remove it if we think it causes more bad than good. exclusiveEventTypes.forEach(eventType => { this.shippersRegistry.addEventExclusiveShipper(eventType, shipperName, shipper); }); } else { this.shippersRegistry.addGlobalShipper(shipperName, shipper); } // Subscribe to the shipper's telemetryCounter$ and pass it over to the client's-level observable (_shipper$telemetryCou = shipper.telemetryCounter$) === null || _shipper$telemetryCou === void 0 ? void 0 : _shipper$telemetryCou.subscribe(counter => this.internalTelemetryCounter$.next({ ...counter, source: shipperName // Enforce the shipper's name in the `source` })); // Spread the optIn configuration updates this.optInConfigWithReplay$.subscribe(optInConfig => { const isOptedIn = optInConfig.isShipperOptedIn(shipperName); try { shipper.optIn(isOptedIn); } catch (err) { this.initContext.logger.warn(`Failed to set isOptedIn:${isOptedIn} in shipper ${shipperName}`, err); } }); // Spread the global context if it has custom extendContext method if (shipper.extendContext) { this.contextWithReplay$.subscribe(context => { try { shipper.extendContext(context); } catch (err) { this.initContext.logger.warn(`Shipper "${shipperName}" failed to extend the context`, err); } }); } // Notify that a shipper is registered this.shipperRegistered$.next(); }); (0, _defineProperty2.default)(this, "flush", async () => { await Promise.all([...this.shippersRegistry.allShippers.entries()].map(async ([shipperName, shipper]) => { try { await shipper.flush(); } catch (err) { this.initContext.logger.warn(`Failed to flush shipper "${shipperName}"`, err); } })); }); (0, _defineProperty2.default)(this, "shutdown", async () => { await this.flush(); this.shippersRegistry.allShippers.forEach((shipper, shipperName) => { try { shipper.shutdown(); } catch (err) { this.initContext.logger.warn(`Failed to shutdown shipper "${shipperName}"`, err); } }); this.internalEventQueue$.complete(); this.internalTelemetryCounter$.complete(); this.shipperRegistered$.complete(); this.optInConfig$.complete(); this.context$.complete(); }); this.initContext = initContext; this.contextService = new _context_service.ContextService(this.context$, this.initContext.isDev, this.initContext.logger.get('context-service')); this.reportEnqueuedEventsWhenClientIsReady(); } /** * Forwards the `events` to the registered shippers, bearing in mind if the shipper is opted-in for that eventType. * @param eventType The event type's name * @param events A bulk array of events matching the eventType. * @private */ sendToShipper(eventType, events) { let sentToShipper = false; this.shippersRegistry.getShippersForEventType(eventType).forEach((shipper, shipperName) => { var _this$optInConfig$$va; const isShipperOptedIn = (_this$optInConfig$$va = this.optInConfig$.value) === null || _this$optInConfig$$va === void 0 ? void 0 : _this$optInConfig$$va.isShipperOptedIn(shipperName, eventType); // Only send it to the non-explicitly opted-out shippers if (isShipperOptedIn) { sentToShipper = true; try { shipper.reportEvents(events); } catch (err) { this.initContext.logger.warn(`Failed to report event "${eventType}" via shipper "${shipperName}"`, err); } } }); if (sentToShipper) { this.internalTelemetryCounter$.next({ type: 'sent_to_shipper', source: 'client', event_type: eventType, code: 'OK', count: events.length }); } } /** * Once the client is ready (it has a valid optInConfig and at least one shipper), * flush any early events and ship them or discard them based on the optInConfig. * @private */ reportEnqueuedEventsWhenClientIsReady() { // Observer that will emit when both events occur: the OptInConfig is set + a shipper has been registered const configReceivedAndShipperReceivedObserver$ = (0, _rxjs.combineLatest)([this.optInConfigWithReplay$, (0, _rxjs.merge)([this.shipperRegistered$, // Merging shipperRegistered$ with the optInConfigWithReplay$ when optedIn is false, so that we don't need to wait for the shipper if opted-in === false this.optInConfigWithReplay$.pipe((0, _operators.filter)(cfg => (cfg === null || cfg === void 0 ? void 0 : cfg.isOptedIn()) === false))])]); // Flush the internal queue when we get any optInConfig and, at least, 1 shipper this.internalEventQueue$.pipe( // Take until will close the observer once we reach the condition below (0, _operators.takeUntil)(configReceivedAndShipperReceivedObserver$), // Accumulate the events until we can send them (0, _operators.buffer)(configReceivedAndShipperReceivedObserver$), // Minimal delay only to make this chain async and let the optIn operation to complete first. (0, _operators.delay)(0), // Re-emit the context to make sure all the shippers got it (only if opted-in) (0, _operators.tap)(() => { var _this$optInConfig$$va2; if ((_this$optInConfig$$va2 = this.optInConfig$.value) !== null && _this$optInConfig$$va2 !== void 0 && _this$optInConfig$$va2.isOptedIn()) { this.context$.next(this.context$.value); } }), // Minimal delay only to make this chain async and let // the context update operation to complete first. (0, _operators.delay)(0), // Flatten the array of events (0, _operators.concatMap)(events => (0, _rxjs.from)(events)), // Discard opted-out events (0, _operators.filter)(event => { var _this$optInConfig$$va3; return ((_this$optInConfig$$va3 = this.optInConfig$.value) === null || _this$optInConfig$$va3 === void 0 ? void 0 : _this$optInConfig$$va3.isEventTypeOptedIn(event.event_type)) === true; }), // Let's group the requests per eventType for easier batching (0, _operators.groupBy)(event => event.event_type), (0, _operators.mergeMap)(groupedObservable => groupedObservable.pipe((0, _operators.bufferCount)(1000), // Batching up-to 1000 events per event type for backpressure reasons (0, _operators.map)(events => ({ eventType: groupedObservable.key, events }))))).subscribe(({ eventType, events }) => { this.sendToShipper(eventType, events); }); } } exports.AnalyticsClient = AnalyticsClient;