"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.FunctionArgsValidationError = void 0; exports.createChatService = createChatService; var _jsonSchema = require("@cfworker/json-schema"); var _common = require("@kbn/kibana-utils-plugin/common"); var _lodash = require("lodash"); var _rxjs = require("rxjs"); var _types = require("../../common/types"); var _readable_stream_reader_into_observable = require("../utils/readable_stream_reader_into_observable"); /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ /* eslint-disable max-classes-per-file*/ class TokenLimitReachedError extends Error { constructor() { super(`Token limit reached`); } } class ServerError extends Error {} class FunctionArgsValidationError extends Error { constructor(errors) { super('Function arguments are invalid'); this.errors = errors; } } exports.FunctionArgsValidationError = FunctionArgsValidationError; async function createChatService({ signal: setupAbortSignal, registrations, client }) { const contextRegistry = new Map(); const functionRegistry = new Map(); const validators = new Map(); const registerContext = context => { contextRegistry.set(context.name, context); }; const registerFunction = (def, respond, render) => { validators.set(def.name, new _jsonSchema.Validator(def.parameters, '2020-12', true)); functionRegistry.set(def.name, { options: def, respond, render }); }; const getContexts = () => { return Array.from(contextRegistry.values()); }; const getFunctions = ({ contexts, filter } = {}) => { const allFunctions = Array.from(functionRegistry.values()); return contexts || filter ? allFunctions.filter(fn => { const matchesContext = !contexts || fn.options.contexts.some(context => contexts.includes(context)); const matchesFilter = !filter || fn.options.name.includes(filter) || fn.options.description.includes(filter); return matchesContext && matchesFilter; }) : allFunctions; }; await Promise.all(registrations.map(fn => fn({ signal: setupAbortSignal, registerContext, registerFunction }))); function validate(name, parameters) { const validator = validators.get(name); const result = validator.validate(parameters); if (!result.valid) { throw new FunctionArgsValidationError(result.errors); } } return { executeFunction: async ({ name, args, signal, messages }) => { const fn = functionRegistry.get(name); if (!fn) { throw new Error(`Function ${name} not found`); } const parsedArguments = args ? JSON.parse(args) : {}; validate(name, parsedArguments); return await fn.respond({ arguments: parsedArguments, messages }, signal); }, renderFunction: (name, args, response) => { var _response$content, _response$data, _fn$render; const fn = functionRegistry.get(name); if (!fn) { throw new Error(`Function ${name} not found`); } const parsedArguments = args ? JSON.parse(args) : {}; const parsedResponse = { content: JSON.parse((_response$content = response.content) !== null && _response$content !== void 0 ? _response$content : '{}'), data: JSON.parse((_response$data = response.data) !== null && _response$data !== void 0 ? _response$data : '{}') }; return (_fn$render = fn.render) === null || _fn$render === void 0 ? void 0 : _fn$render.call(fn, { response: parsedResponse, arguments: parsedArguments }); }, getContexts, getFunctions, hasRenderFunction: name => { var _getFunctions$find; return !!((_getFunctions$find = getFunctions().find(fn => fn.options.name === name)) !== null && _getFunctions$find !== void 0 && _getFunctions$find.render); }, chat({ connectorId, messages, function: callFunctions = 'auto' }) { const subject = new _rxjs.BehaviorSubject({ message: { role: _types.MessageRole.Assistant } }); const contexts = ['core', 'apm']; const functions = getFunctions({ contexts }); const controller = new AbortController(); client('POST /internal/observability_ai_assistant/chat', { params: { body: { messages, connectorId, functions: callFunctions === 'none' ? [] : functions.map(fn => (0, _lodash.pick)(fn.options, 'name', 'description', 'parameters')) } }, signal: controller.signal, asResponse: true, rawResponse: true }).then(_response => { var _response$response, _response$response$bo; const response = _response; const status = (_response$response = response.response) === null || _response$response === void 0 ? void 0 : _response$response.status; if (!status || status >= 400) { var _response$response2; throw new Error(((_response$response2 = response.response) === null || _response$response2 === void 0 ? void 0 : _response$response2.statusText) || 'Unexpected error'); } const reader = (_response$response$bo = response.response.body) === null || _response$response$bo === void 0 ? void 0 : _response$response$bo.getReader(); if (!reader) { throw new Error('Could not get reader from response'); } const subscription = (0, _readable_stream_reader_into_observable.readableStreamReaderIntoObservable)(reader).pipe((0, _rxjs.map)(line => line.substring(6)), (0, _rxjs.filter)(line => !!line && line !== '[DONE]'), (0, _rxjs.map)(line => JSON.parse(line)), (0, _rxjs.tap)(line => { if ('error' in line) { throw new ServerError(line.error.message); } }), (0, _rxjs.filter)(line => 'object' in line && line.object === 'chat.completion.chunk'), (0, _rxjs.tap)(line => { if (line.choices[0].finish_reason === 'length') { throw new TokenLimitReachedError(); } }), (0, _rxjs.scan)((acc, { choices }) => { var _choices$0$delta$cont, _choices$0$delta$func, _choices$0$delta$func2, _choices$0$delta$func3, _choices$0$delta$func4; acc.message.content += (_choices$0$delta$cont = choices[0].delta.content) !== null && _choices$0$delta$cont !== void 0 ? _choices$0$delta$cont : ''; acc.message.function_call.name += (_choices$0$delta$func = (_choices$0$delta$func2 = choices[0].delta.function_call) === null || _choices$0$delta$func2 === void 0 ? void 0 : _choices$0$delta$func2.name) !== null && _choices$0$delta$func !== void 0 ? _choices$0$delta$func : ''; acc.message.function_call.arguments += (_choices$0$delta$func3 = (_choices$0$delta$func4 = choices[0].delta.function_call) === null || _choices$0$delta$func4 === void 0 ? void 0 : _choices$0$delta$func4.arguments) !== null && _choices$0$delta$func3 !== void 0 ? _choices$0$delta$func3 : ''; return (0, _lodash.cloneDeep)(acc); }, { message: { content: '', function_call: { name: '', arguments: '', trigger: _types.MessageRole.Assistant }, role: _types.MessageRole.Assistant } }), (0, _rxjs.catchError)(error => (0, _rxjs.of)({ ...subject.value, error, aborted: error instanceof _common.AbortError || controller.signal.aborted }))).subscribe(subject); controller.signal.addEventListener('abort', () => { subscription.unsubscribe(); subject.next({ ...subject.value, aborted: true }); subject.complete(); }); }).catch(async err => { if ('response' in err) { var _err$response; const body = await ((_err$response = err.response) === null || _err$response === void 0 ? void 0 : _err$response.json()); err.body = body; if (body.message) { err.message = body.message; } } throw err; }).catch(err => { subject.next({ ...subject.value, aborted: false, error: err }); subject.complete(); }); return subject.pipe((0, _rxjs.concatMap)(value => (0, _rxjs.of)(value).pipe((0, _rxjs.delay)(50))), (0, _rxjs.shareReplay)(1), (0, _rxjs.finalize)(() => { controller.abort(); })); } }; }