From 7a03119c5cfc0edabc163a22e7be6af2780211e1 Mon Sep 17 00:00:00 2001 From: Richard Cox Date: Mon, 1 Aug 2022 12:04:41 +0100 Subject: [PATCH 1/2] Revert "Merge pull request #6553 from mantis-toboggan-md/revert-6421" This reverts commit c86abfae239752e6a3fedf834724f87c948fdefd, reversing changes made to 57f9a3706cae843cab4024d8118e4f42c213973c. --- package.json | 2 + shell/components/nav/Header.vue | 2 +- shell/package.json | 2 + shell/plugins/dashboard-store/actions.js | 72 +++++++------ shell/plugins/dashboard-store/index.js | 6 +- shell/plugins/steve/actions.js | 8 +- shell/plugins/steve/index.js | 7 +- shell/plugins/steve/mutations.js | 12 ++- shell/plugins/steve/subscribe.js | 88 ++++++++++++++-- shell/plugins/steve/worker.js | 129 +++++++++++++++++++++++ yarn.lock | 13 +++ 11 files changed, 294 insertions(+), 47 deletions(-) create mode 100644 shell/plugins/steve/worker.js diff --git a/package.json b/package.json index 70267318c6..fdfca9c02f 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "ansi_up": "^5.0.0", "babel-plugin-module-resolver": "^4.0.0", "browser-env": "^3.2.6", + "comlink": "^4.3.1", "cookie": "^0.4.0", "cookie-universal-nuxt": "^2.0.17", "cron-validator": "^1.2.0", @@ -178,6 +179,7 @@ "vue-template-compiler": "2.6.14", "webpack-bundle-analyzer": "^4.5.0", "webpack-virtual-modules": "^0.4.3", + "worker-loader": "^3.0.8", "yaml-lint": "^1", "yarn": "^1.22.11" }, diff --git a/shell/components/nav/Header.vue b/shell/components/nav/Header.vue index 26961e6964..ba7fac0308 100644 --- a/shell/components/nav/Header.vue +++ b/shell/components/nav/Header.vue @@ -331,7 +331,7 @@ export default {
diff --git a/shell/package.json b/shell/package.json index 072ae06863..f7c3713328 100644 --- a/shell/package.json +++ b/shell/package.json @@ -63,6 +63,7 @@ "babel-plugin-module-resolver": "^4.0.0", "babel-preset-vue": "^2.0.2", "browser-env": "^3.2.6", + "comlink": "^4.3.1", "cookie": "^0.4.0", "cookie-universal-nuxt": "^2.0.17", "core-js": "^3.20.3", @@ -148,6 +149,7 @@ "xterm-addon-search": "^0.7.0", "xterm-addon-web-links": "^0.4.0", "xterm-addon-webgl": "^0.9.0", + "worker-loader": "^3.0.8", "yarn": "^1.22.11" }, "nyc": { diff --git a/shell/plugins/dashboard-store/actions.js b/shell/plugins/dashboard-store/actions.js index fcdaac0d04..c45bbdff47 100644 --- a/shell/plugins/dashboard-store/actions.js +++ b/shell/plugins/dashboard-store/actions.js @@ -34,46 +34,48 @@ export async function handleSpoofedRequest(rootGetters, schemaStore, opt, produc } } +export async function loadSchemas(ctx, watch = true) { + const { + getters, dispatch, commit, rootGetters + } = ctx; + const res = await dispatch('findAll', { type: SCHEMA, opt: { url: 'schemas', load: false } }); + const spoofedTypes = rootGetters['type-map/allSpoofedSchemas'] ; + + if (Array.isArray(res.data)) { + res.data = res.data.concat(spoofedTypes); + } else if (Array.isArray(res)) { + res.data = res.concat(spoofedTypes); + } + + res.data.forEach((schema) => { + schema._id = normalizeType(schema.id); + schema._group = normalizeType(schema.attributes?.group); + }); + + commit('loadAll', { + ctx, + type: SCHEMA, + data: res.data + }); + + if ( watch !== false ) { + dispatch('watch', { + type: SCHEMA, + revision: res.revision + }); + } + + const all = getters.all(SCHEMA); + + return all; +} + export default { request() { throw new Error('Not Implemented'); }, - async loadSchemas(ctx, watch = true) { - const { - getters, dispatch, commit, rootGetters - } = ctx; - const res = await dispatch('findAll', { type: SCHEMA, opt: { url: 'schemas', load: false } }); - const spoofedTypes = rootGetters['type-map/allSpoofedSchemas'] ; - - if (Array.isArray(res.data)) { - res.data = res.data.concat(spoofedTypes); - } else if (Array.isArray(res)) { - res.data = res.concat(spoofedTypes); - } - - res.data.forEach((schema) => { - schema._id = normalizeType(schema.id); - schema._group = normalizeType(schema.attributes?.group); - }); - - commit('loadAll', { - ctx, - type: SCHEMA, - data: res.data - }); - - if ( watch !== false ) { - dispatch('watch', { - type: SCHEMA, - revision: res.revision - }); - } - - const all = getters.all(SCHEMA); - - return all; - }, + loadSchemas, async findAll(ctx, { type, opt }) { const { diff --git a/shell/plugins/dashboard-store/index.js b/shell/plugins/dashboard-store/index.js index 9e2f3bfa75..910c3418e0 100644 --- a/shell/plugins/dashboard-store/index.js +++ b/shell/plugins/dashboard-store/index.js @@ -29,7 +29,7 @@ export const coreStoreState = (namespace, baseUrl, isClusterStore) => ({ types: {}, }); -export default (vuexModule, config) => { +export default (vuexModule, config, init) => { const namespace = config.namespace || ''; return function(store) { @@ -61,6 +61,10 @@ export default (vuexModule, config) => { } }); + if (init) { + init(store, ctx); + } + // Turn all the objects in the store from the server into proxies const state = fromServer?.state?.[namespace]; diff --git a/shell/plugins/steve/actions.js b/shell/plugins/steve/actions.js index 2befd1f9b1..4e502f4042 100644 --- a/shell/plugins/steve/actions.js +++ b/shell/plugins/steve/actions.js @@ -1,6 +1,6 @@ import https from 'https'; import { addParam, parse as parseUrl, stringify as unParseUrl } from '@shell/utils/url'; -import { handleSpoofedRequest } from '@shell/plugins/dashboard-store/actions'; +import { handleSpoofedRequest, loadSchemas } from '@shell/plugins/dashboard-store/actions'; import { set } from '@shell/utils/object'; import { deferred } from '@shell/utils/promise'; import { streamJson, streamingSupported } from '@shell/utils/stream'; @@ -9,6 +9,12 @@ import { classify } from '@shell/plugins/dashboard-store/classify'; import { NAMESPACE } from '@shell/config/types'; export default { + + // Need to override this, so that thhe 'this' context is correct (this class not the base class) + async loadSchemas(ctx, watch = true) { + return await loadSchemas(ctx, watch); + }, + async request({ state, dispatch, rootGetters }, pOpt ) { const opt = pOpt.opt || pOpt; diff --git a/shell/plugins/steve/index.js b/shell/plugins/steve/index.js index 659b9d4f3d..ad0f6f0463 100644 --- a/shell/plugins/steve/index.js +++ b/shell/plugins/steve/index.js @@ -1,6 +1,6 @@ import coreStore, { coreStoreModule, coreStoreState } from '@shell/plugins/dashboard-store/index'; - import { + createWorker, mutations as subscribeMutations, actions as subscribeActions, getters as subscribeGetters @@ -65,7 +65,10 @@ export default (config) => { } return coreStore( - SteveFactory(config), + SteveFactory(config.namespace, config.baseUrl), config, + (store, ctx) => { + createWorker(store, ctx); + } ); }; diff --git a/shell/plugins/steve/mutations.js b/shell/plugins/steve/mutations.js index 70915ce466..30be4ec79c 100644 --- a/shell/plugins/steve/mutations.js +++ b/shell/plugins/steve/mutations.js @@ -1,5 +1,5 @@ import { addObject, removeObject } from '@shell/utils/array'; -import { NAMESPACE, POD } from '@shell/config/types'; +import { NAMESPACE, POD, SCHEMA } from '@shell/config/types'; import { forgetType, resetStore, @@ -52,6 +52,16 @@ export default { cache.map.set(entry.id, entry); }); } + + // Notify the web worker of the initial load of schemas + if (type === SCHEMA) { + const worker = (this.$workers || {})[ctx.getters.storeName]; + + if (worker) { + // Store raw json objects, not the proxies + worker.loadSchema(data); + } + } }, forgetType(state, type) { diff --git a/shell/plugins/steve/subscribe.js b/shell/plugins/steve/subscribe.js index 8b4de5b472..45139357ef 100644 --- a/shell/plugins/steve/subscribe.js +++ b/shell/plugins/steve/subscribe.js @@ -1,5 +1,6 @@ import { addObject, clear, removeObject } from '@shell/utils/array'; import { get } from '@shell/utils/object'; +import { COUNT, SCHEMA } from '@shell/config/types'; import Socket, { EVENT_CONNECTED, EVENT_DISCONNECTED, @@ -13,6 +14,10 @@ import day from 'dayjs'; import { DATE_FORMAT, TIME_FORMAT } from '@shell/store/prefs'; import { escapeHtml } from '@shell/utils/string'; +// eslint-disable-next-line +import Worker from 'worker-loader!./worker' +import * as Comlink from 'comlink'; + export const NO_WATCH = 'NO_WATCH'; export const NO_SCHEMA = 'NO_SCHEMA'; @@ -22,6 +27,30 @@ const MINIMUM_TIME_NOTIFIED = 3000; // minimum time a socket must be disconnected for before sending a growl const MINIMUM_TIME_DISCONNECTED = 10000; +// We only create a worker for the cluster store +export function createWorker(store, ctx) { + const { getters } = ctx; + const storeName = getters.storeName; + + store.$workers = store.$workers || {}; + + if (storeName !== 'cluster') { + return; + } + + function callback(resource) { + queueChange(ctx, resource, true, 'Change'); + } + + if (!store.$workers[storeName]) { + const worker = Comlink.wrap(new Worker()); + + store.$workers[storeName] = worker; + + worker.initWorker(storeName, Comlink.proxy(callback)); + } +} + export function keyForSubscribe({ resourceType, type, namespace, id, selector } = {}) { @@ -78,7 +107,7 @@ function queueChange({ getters, state }, { data, revision }, load, label) { }); } - if ( type === 'schema' ) { + if ( type === SCHEMA ) { // Clear the current records in the store when a type disappears state.queue.push({ action: 'commit', @@ -150,11 +179,23 @@ export const actions = { socket.connect(get(opt, 'metadata')); }, - unsubscribe({ state, commit }) { + unsubscribe({ state, commit, getters }) { const socket = state.socket; + const worker = (this.$workers || {})[getters.storeName]; commit('setWantSocket', false); + if (worker) { + try { + worker.destroyWorker(); + worker[Comlink.releaseProxy](); + } catch (e) { + console.error(e); // eslint-disable-line no-console + } + + delete this.$workers[getters.storeName]; + } + if ( socket ) { return socket.disconnect(); } @@ -523,10 +564,36 @@ export const actions = { }, 'ws.resource.change'(ctx, msg) { - queueChange(ctx, msg, true, 'Change'); - const data = msg.data; const type = data.type; + + // Debounce count changes so we send at most 1 every 5 seconds + if (type === COUNT) { + const worker = (this.$workers || {})[ctx.getters.storeName]; + + if (worker) { + worker.countsUpdate(msg); + + // No further processing - let the web worker debounce the counts + return; + } + } + + // Web worker can process schemas to check that they are actually changing and + // only load updates if the schema did actually change + if (type === SCHEMA) { + const worker = (this.$workers || {})[ctx.getters.storeName]; + + if (worker) { + worker.updateSchema(data); + + // No further processing - let the web worker check the schema updates + return; + } + } + + queueChange(ctx, msg, true, 'Change'); + const typeOption = ctx.rootGetters['type-map/optionsFor'](type); if (typeOption?.alias?.length > 0) { @@ -546,10 +613,19 @@ export const actions = { }, 'ws.resource.remove'(ctx, msg) { - queueChange(ctx, msg, false, 'Remove'); - const data = msg.data; const type = data.type; + + if (type === SCHEMA) { + const worker = (this.$workers || {})[ctx.getters.storeName]; + + if (worker) { + worker.removeSchema(data.id); + } + } + + queueChange(ctx, msg, false, 'Remove'); + const typeOption = ctx.rootGetters['type-map/optionsFor'](type); if (typeOption?.alias?.length > 0) { diff --git a/shell/plugins/steve/worker.js b/shell/plugins/steve/worker.js new file mode 100644 index 0000000000..c0597ab93a --- /dev/null +++ b/shell/plugins/steve/worker.js @@ -0,0 +1,129 @@ +import * as Comlink from 'comlink'; +import { SCHEMA } from '@shell/config/types'; + +const COUNTS_FLUSH_TIMEOUT = 5000; +const SCHEMA_FLUSH_TIMEOUT = 2500; + +const state = { + store: '', // Store name + load: undefined, // Load callback to load a resource into the store + counts: [], // Buffer of count resources recieved in a given window + countTimer: undefined, // Tiemr to flush the count buffer + flushTimer: undefined, // Timer to flush the schema chaneg queue + queue: [], // Schema change queue + schemas: {} // Map of schema id to hash to track when a schema actually changes +}; + +// Quick, simple hash function +function hash(str) { + let hash = 0; + + for (let i = 0; i < str.length; i++) { + const char = str.charCodeAt(i); + + hash = (hash << 5) - hash + char; + hash &= hash; + } + + return new Uint32Array([hash])[0].toString(36); +} + +// Quick, simple hash function to generate hash for an object +function hashObj(obj) { + return hash(JSON.stringify(obj, null, 2)); +} + +function flush() { + state.queue.forEach((schema) => { + const hash = hashObj(schema); + const existing = state.schemas[schema.id]; + + if (!existing || (existing && existing !== hash)) { + // console.log(`${ schema.id } CHANGED ${ hash } > ${ existing }`); + state.schemas[schema.id] = hash; + + const msg = { + data: schema, + resourceType: SCHEMA, + type: 'resource.change' + }; + + load(msg); + } + }); + + state.queue = []; + + state.flushTimer = setTimeout(flush, SCHEMA_FLUSH_TIMEOUT); +} + +state.flushTimer = setTimeout(flush, SCHEMA_FLUSH_TIMEOUT); + +// Callback to the store's load function (in the main thread) to process a load +function load(data) { + if (state.load) { + state.load(data); + } +} + +// Web Worker API +const fns = { + initWorker(storeName, loadFn) { + state.store = storeName; + state.load = loadFn; + }, + + destroyWorker() { + clearTimeout(state.countTimer); + clearTimeout(state.flushTimer); + + // Web worker global function to terminate the web worker + close(); + }, + + // Debounce counts messages so we only process at most 1 every 5 seconds + countsUpdate(resource) { + state.counts.push(resource); + + if (!state.countTimer) { + state.countTimer = setTimeout(() => { + const last = state.counts.pop(); + + state.counts = []; + state.countTimer = null; + + load(last); + }, COUNTS_FLUSH_TIMEOUT); + } + }, + + // Called to load schema + loadSchema(schemas) { + schemas.forEach((schema) => { + // These properties are added to the object, but aren't on the raw object, so remove them + // otherwise our comparison will show changes when there aren't any + delete schema._id; + delete schema._group; + + state.schemas[schema.id] = hashObj(schema); + }); + }, + + // Called when schema is updated + updateSchema(schema) { + // Add the schema to the queue to be checked to see if the schema really changed + state.queue.push(schema); + }, + + // Remove the cached schema + removeSchema(id) { + // Remove anything in the queue related to the schema - we don't want to send any pending updates later for a schema that has been removed + state.queue = state.queue.filter(schema => schema.id !== id); + + // Delete the schema from the map, so if it comes back we don't ignore it if the hash is the same + delete state.schemas[id]; + } +}; + +// Expose the Web Worker API - see: https://github.com/GoogleChromeLabs/comlink +Comlink.expose(fns); diff --git a/yarn.lock b/yarn.lock index 8978257eb0..0a82d02db6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6590,6 +6590,11 @@ combined-stream@^1.0.6, combined-stream@^1.0.8, combined-stream@~1.0.6: dependencies: delayed-stream "~1.0.0" +comlink@^4.3.1: + version "4.3.1" + resolved "https://registry.yarnpkg.com/comlink/-/comlink-4.3.1.tgz#0c6b9d69bcd293715c907c33fe8fc45aecad13c5" + integrity sha512-+YbhUdNrpBZggBAHWcgQMLPLH1KDF3wJpeqrCKieWQ8RL7atmgsgTQko1XEBK6PsecfopWNntopJ+ByYG1lRaA== + commander@2, commander@^2.12.1, commander@^2.18.0, commander@^2.19.0, commander@^2.20.0: version "2.20.3" resolved "https://registry.yarnpkg.com/commander/-/commander-2.20.3.tgz#fd485e84c03eb4881c20722ba48035e8531aeb33" @@ -17542,6 +17547,14 @@ worker-farm@^1.7.0: dependencies: errno "~0.1.7" +worker-loader@^3.0.8: + version "3.0.8" + resolved "https://registry.yarnpkg.com/worker-loader/-/worker-loader-3.0.8.tgz#5fc5cda4a3d3163d9c274a4e3a811ce8b60dbb37" + integrity sha512-XQyQkIFeRVC7f7uRhFdNMe/iJOdO6zxAaR3EWbDp45v3mDhrTi+++oswKNxShUNjPC/1xUp5DB29YKLhFo129g== + dependencies: + loader-utils "^2.0.0" + schema-utils "^3.0.0" + worker-rpc@^0.1.0: version "0.1.1" resolved "https://registry.yarnpkg.com/worker-rpc/-/worker-rpc-0.1.1.tgz#cb565bd6d7071a8f16660686051e969ad32f54d5" From 4c16f368a3e2a96dcc0e71574154d13b712cb2d7 Mon Sep 17 00:00:00 2001 From: Richard Cox Date: Mon, 1 Aug 2022 12:08:16 +0100 Subject: [PATCH 2/2] Fix web worker Inline worker bits to avoid CORS issues in chrome when dashboard is `-head` --- shell/nuxt.config.js | 7 +++++++ shell/plugins/steve/subscribe.js | 2 +- .../steve/{worker.js => web-worker.steve-sub-worker.js} | 0 3 files changed, 8 insertions(+), 1 deletion(-) rename shell/plugins/steve/{worker.js => web-worker.steve-sub-worker.js} (100%) diff --git a/shell/nuxt.config.js b/shell/nuxt.config.js index 52392bc03b..ff26053184 100644 --- a/shell/nuxt.config.js +++ b/shell/nuxt.config.js @@ -453,6 +453,13 @@ export default function(dir, _appConfig) { }, }); + // Ensure there is a fallback for browsers that don't support web workers + config.module.rules.unshift({ + test: /web-worker.[a-z-]+.js/i, + loader: 'worker-loader', + options: { inline: 'fallback' }, + }); + // Prevent warning in log with the md files in the content folder config.module.rules.push({ test: /\.md$/, diff --git a/shell/plugins/steve/subscribe.js b/shell/plugins/steve/subscribe.js index 45139357ef..493090b937 100644 --- a/shell/plugins/steve/subscribe.js +++ b/shell/plugins/steve/subscribe.js @@ -15,7 +15,7 @@ import { DATE_FORMAT, TIME_FORMAT } from '@shell/store/prefs'; import { escapeHtml } from '@shell/utils/string'; // eslint-disable-next-line -import Worker from 'worker-loader!./worker' +import Worker from './web-worker.steve-sub-worker.js' import * as Comlink from 'comlink'; export const NO_WATCH = 'NO_WATCH'; diff --git a/shell/plugins/steve/worker.js b/shell/plugins/steve/web-worker.steve-sub-worker.js similarity index 100% rename from shell/plugins/steve/worker.js rename to shell/plugins/steve/web-worker.steve-sub-worker.js