dashboard/shell/plugins/steve/subscribe.js

766 lines
19 KiB
JavaScript

import { addObject, clear, removeObject } from '@shell/utils/array';
import { get } from '@shell/utils/object';
import { COUNT, SCHEMA } from '@shell/config/types';
import Socket, {
EVENT_CONNECTED,
EVENT_DISCONNECTED,
EVENT_MESSAGE,
// EVENT_FRAME_TIMEOUT,
EVENT_CONNECT_ERROR,
EVENT_DISCONNECT_ERROR
} from '@shell/utils/socket';
import { normalizeType } from '@shell/plugins/dashboard-store/normalize';
import day from 'dayjs';
import { DATE_FORMAT, TIME_FORMAT } from '@shell/store/prefs';
import { escapeHtml } from '@shell/utils/string';
// eslint-disable-next-line
import Worker from './web-worker.steve-sub-worker.js'
import * as Comlink from 'comlink';
export const NO_WATCH = 'NO_WATCH';
export const NO_SCHEMA = 'NO_SCHEMA';
// minimum length of time a disconnect notification is shown
const MINIMUM_TIME_NOTIFIED = 3000;
// minimum time a socket must be disconnected for before sending a growl
const MINIMUM_TIME_DISCONNECTED = 10000;
// We only create a worker for the cluster store
export function createWorker(store, ctx) {
const { getters } = ctx;
const storeName = getters.storeName;
store.$workers = store.$workers || {};
if (storeName !== 'cluster') {
return;
}
function callback(resource) {
queueChange(ctx, resource, true, 'Change');
}
if (!store.$workers[storeName]) {
const worker = Comlink.wrap(new Worker());
store.$workers[storeName] = worker;
worker.initWorker(storeName, Comlink.proxy(callback));
}
}
export function keyForSubscribe({
resourceType, type, namespace, id, selector
} = {}) {
return `${ resourceType || type || '' }/${ namespace || '' }/${ id || '' }/${ selector || '' }`;
}
export function equivalentWatch(a, b) {
if ( a.type !== b.type ) {
return false;
}
if ( a.id !== b.id && (a.id || b.id) ) {
return false;
}
if ( a.namespace !== b.namespace && (a.namespace || b.namespace) ) {
return false;
}
if ( a.selector !== b.selector && (a.selector || b.selector) ) {
return false;
}
return true;
}
function queueChange({ getters, state }, { data, revision }, load, label) {
const type = getters.normalizeType(data.type);
const entry = getters.typeEntry(type);
if ( entry ) {
entry.revision = Math.max(entry.revision, parseInt(revision, 10));
} else {
return;
}
// console.log(`${ label } Event [${ state.config.namespace }]`, data.type, data.id); // eslint-disable-line no-console
if ( load ) {
state.queue.push({
action: 'dispatch',
event: 'load',
body: data
});
} else {
const obj = getters.byId(data.type, data.id);
if ( obj ) {
state.queue.push({
action: 'commit',
event: 'remove',
body: obj
});
}
if ( type === SCHEMA ) {
// Clear the current records in the store when a type disappears
state.queue.push({
action: 'commit',
event: 'forgetType',
body: data.id
});
}
}
}
export const actions = {
subscribe(ctx, opt) {
const {
state, commit, dispatch, getters, rootGetters
} = ctx;
if (rootGetters['isSingleProduct']?.disableSteveSockets) {
return;
}
let socket = state.socket;
commit('setWantSocket', true);
if ( process.server ) {
return;
}
state.debugSocket && console.info(`Subscribe [${ getters.storeName }]`); // eslint-disable-line no-console
const url = `${ state.config.baseUrl }/subscribe`;
if ( socket ) {
socket.setAutoReconnect(true);
socket.setUrl(url);
} else {
socket = new Socket(`${ state.config.baseUrl }/subscribe`);
commit('setSocket', socket);
socket.addEventListener(EVENT_CONNECTED, (e) => {
dispatch('opened', e);
});
socket.addEventListener(EVENT_DISCONNECTED, (e) => {
dispatch('closed', e);
});
socket.addEventListener(EVENT_CONNECT_ERROR, (e) => {
dispatch('error', e );
});
socket.addEventListener(EVENT_DISCONNECT_ERROR, (e) => {
dispatch('error', e );
});
socket.addEventListener(EVENT_MESSAGE, (e) => {
const event = e.detail;
if ( event.data) {
const msg = JSON.parse(event.data);
if (msg.name) {
dispatch(`ws.${ msg.name }`, msg);
}
}
});
}
socket.connect(get(opt, 'metadata'));
},
unsubscribe({ state, commit, getters }) {
const socket = state.socket;
const worker = (this.$workers || {})[getters.storeName];
commit('setWantSocket', false);
if (worker) {
try {
worker.destroyWorker();
worker[Comlink.releaseProxy]();
} catch (e) {
console.error(e); // eslint-disable-line no-console
}
delete this.$workers[getters.storeName];
}
if ( socket ) {
return socket.disconnect();
}
},
async flush({
state, commit, dispatch, getters
}) {
const queue = state.queue;
let toLoad = [];
if ( !queue.length ) {
return;
}
const started = new Date().getTime();
state.queue = [];
state.debugSocket && console.debug(`Subscribe Flush [${ getters.storeName }]`, queue.length, 'items'); // eslint-disable-line no-console
for ( const { action, event, body } of queue ) {
if ( action === 'dispatch' && event === 'load' ) {
// Group loads into one loadMulti when possible
toLoad.push(body);
} else {
// When we hit a different kind of event, process all the previous loads, then the other event.
if ( toLoad.length ) {
await dispatch('loadMulti', toLoad);
toLoad = [];
}
if ( action === 'dispatch' ) {
await dispatch(event, body);
} else if ( action === 'commit' ) {
commit(event, body);
} else {
throw new Error('Invalid queued action');
}
}
}
// Process any remaining loads
if ( toLoad.length ) {
await dispatch('loadMulti', toLoad);
}
state.debugSocket && console.debug(`Subscribe Flush [${ getters.storeName }] finished`, (new Date().getTime()) - started, 'ms'); // eslint-disable-line no-console
},
rehydrateSubscribe({ state, dispatch }) {
if ( process.client && state.wantSocket && !state.socket ) {
dispatch('subscribe');
}
},
watch({
state, dispatch, getters, rootGetters
}, params) {
state.debugSocket && console.info(`Watch Request [${ getters.storeName }]`, JSON.stringify(params)); // eslint-disable-line no-console
let {
// eslint-disable-next-line prefer-const
type, selector, id, revision, namespace, stop, force
} = params;
type = getters.normalizeType(type);
if (rootGetters['type-map/isSpoofed'](type)) {
state.debugSocket && console.info('Will not Watch (type is spoofed)', JSON.stringify(params)); // eslint-disable-line no-console
return;
}
if ( !stop && !force && !getters.canWatch(params) ) {
console.error(`Cannot Watch [${ getters.storeName }]`, JSON.stringify(params)); // eslint-disable-line no-console
return;
}
if ( !stop && getters.watchStarted({
type, id, selector, namespace
}) ) {
state.debugSocket && console.debug(`Already Watching [${ getters.storeName }]`, JSON.stringify(params)); // eslint-disable-line no-console
return;
}
if ( typeof revision === 'undefined' ) {
revision = getters.nextResourceVersion(type, id);
}
const msg = { resourceType: type };
if ( revision ) {
msg.resourceVersion = `${ revision }`;
}
if ( namespace ) {
msg.namespace = namespace;
}
if ( stop ) {
msg.stop = true;
}
if ( id ) {
msg.id = id;
}
if ( selector ) {
msg.selector = selector;
}
return dispatch('send', msg);
},
reconnectWatches({
state, getters, commit, dispatch
}) {
const promises = [];
for ( const entry of state.started.slice() ) {
console.info(`Reconnect [${ getters.storeName }]`, JSON.stringify(entry)); // eslint-disable-line no-console
if ( getters.schemaFor(entry.type) ) {
commit('setWatchStopped', entry);
delete entry.revision;
promises.push(dispatch('watch', entry));
}
}
return Promise.all(promises);
},
async resyncWatch({
state, getters, dispatch, commit
}, params) {
const {
resourceType, namespace, id, selector
} = params;
console.info(`Resync [${ getters.storeName }]`, params); // eslint-disable-line no-console
const opt = { force: true, forceWatch: true };
if ( id ) {
await dispatch('find', {
type: resourceType,
id,
opt,
});
commit('clearInError', params);
return;
}
let have, want;
if ( selector ) {
have = getters['matching'](resourceType, selector).slice();
want = await dispatch('findMatching', {
type: resourceType,
selector,
opt,
});
} else {
have = getters['all'](resourceType).slice();
if ( namespace ) {
have = have.filter(x => x.metadata?.namespace === namespace);
}
want = await dispatch('findAll', {
type: resourceType,
watchNamespace: namespace,
opt
});
}
const wantMap = {};
for ( const obj of want ) {
wantMap[obj.id] = true;
}
for ( const obj of have ) {
if ( !wantMap[obj.id] ) {
state.debugSocket && console.info(`Remove stale [${ getters.storeName }]`, resourceType, obj.id); // eslint-disable-line no-console
commit('remove', obj);
}
}
},
async opened({
commit, dispatch, state, getters, rootGetters
}, event) {
state.debugSocket && console.info(`WebSocket Opened [${ getters.storeName }]`); // eslint-disable-line no-console
const socket = event.currentTarget;
this.$socket = socket;
if ( !state.queue ) {
state.queue = [];
}
if ( !state.queueTimer ) {
state.flushQueue = async() => {
if ( state.queue.length ) {
await dispatch('flush');
}
state.queueTimer = setTimeout(state.flushQueue, 1000);
};
state.flushQueue();
}
if ( socket.hasReconnected ) {
await dispatch('reconnectWatches');
// Check for disconnect notifications and clear them
const growlErr = rootGetters['growl/find']({ key: 'url', val: this.$socket.url });
if (growlErr) {
const now = Date.now();
// even if the socket reconnected, keep the error growl for at least a few seconds to ensure its readable
if (now >= growlErr.earliestClose) {
dispatch('growl/remove', growlErr.id, { root: true });
} else {
setTimeout(() => {
dispatch('growl/remove', growlErr.id, { root: true });
}, growlErr.earliestClose - now);
}
}
}
// Try resending any frames that were attempted to be sent while the socket was down, once.
if ( !process.server ) {
for ( const obj of state.pendingFrames.slice() ) {
commit('dequeuePendingFrame', obj);
dispatch('sendImmediate', obj);
}
}
},
closed({ state, getters }) {
state.debugSocket && console.info(`WebSocket Closed [${ getters.storeName }]`); // eslint-disable-line no-console
clearTimeout(state.queueTimer);
state.queueTimer = null;
},
error({
getters, state, dispatch, rootGetters
}, e) {
clearTimeout(state.queueTimer);
state.queueTimer = null;
if (e.type === EVENT_DISCONNECT_ERROR) {
// do not send a growl notification unless the socket stays disconnected for more than MINIMUM_TIME_DISCONNECTED
setTimeout(() => {
if (state.socket.isConnected()) {
return;
}
const dateFormat = escapeHtml( rootGetters['prefs/get'](DATE_FORMAT));
const timeFormat = escapeHtml( rootGetters['prefs/get'](TIME_FORMAT));
const time = e?.srcElement?.disconnectedAt || Date.now();
const timeFormatted = `${ day(time).format(`${ dateFormat } ${ timeFormat }`) }`;
const url = e?.srcElement?.url;
const t = rootGetters['i18n/t'];
dispatch('growl/error', {
title: t('growl.disconnected.title'),
message: t('growl.disconnected.message', { url, time: timeFormatted }, { raw: true }),
icon: 'error',
earliestClose: time + MINIMUM_TIME_NOTIFIED + MINIMUM_TIME_DISCONNECTED,
url
}, { root: true });
}, MINIMUM_TIME_DISCONNECTED);
} else {
// if the error is not a disconnect error, the socket never worked: log whether the current browser is safari
console.error(`WebSocket Connection Error [${ getters.storeName }]`, e.detail); // eslint-disable-line no-console
}
},
send({ state, commit }, obj) {
if ( state.socket ) {
const ok = state.socket.send(JSON.stringify(obj));
if ( ok ) {
return;
}
}
commit('enqueuePendingFrame', obj);
},
sendImmediate({ state }, obj) {
if ( state.socket ) {
return state.socket.send(JSON.stringify(obj));
}
},
'ws.ping'({ getters, dispatch }, msg) {
if ( getters.storeName === 'management' ) {
const version = msg?.data?.version || null;
dispatch('updateServerVersion', version, { root: true });
console.info(`Ping [${ getters.storeName }] from ${ version || 'unknown version' }`); // eslint-disable-line no-console
}
},
'ws.resource.start'({ state, getters, commit }, msg) {
state.debugSocket && console.info(`Resource start: [${ getters.storeName }]`, msg); // eslint-disable-line no-console
commit('setWatchStarted', {
type: msg.resourceType,
namespace: msg.namespace,
id: msg.id,
selector: msg.selector
});
},
'ws.resource.error'({ getters, commit, dispatch }, msg) {
console.warn(`Resource error [${ getters.storeName }]`, msg.resourceType, ':', msg.data.error); // eslint-disable-line no-console
const err = msg.data?.error?.toLowerCase();
if ( err.includes('watch not allowed') ) {
commit('setInError', { type: msg.resourceType, reason: NO_WATCH });
} else if ( err.includes('failed to find schema') ) {
commit('setInError', { type: msg.resourceType, reason: NO_SCHEMA });
} else if ( err.includes('too old') ) {
dispatch('resyncWatch', msg);
}
},
'ws.resource.stop'({ getters, commit, dispatch }, msg) {
const type = msg.resourceType;
const obj = {
type,
id: msg.id,
namespace: msg.namespace,
selector: msg.selector
};
// console.warn(`Resource stop: [${ getters.storeName }]`, msg); // eslint-disable-line no-console
if ( getters['schemaFor'](type) && getters['watchStarted'](obj) ) {
// Try reconnecting once
commit('setWatchStopped', obj);
setTimeout(() => {
// Delay a bit so that immediate start/error/stop causes
// only a slow infinite loop instead of a tight one.
dispatch('watch', obj);
}, 5000);
}
},
'ws.resource.create'(ctx, msg) {
queueChange(ctx, msg, true, 'Create');
},
'ws.resource.change'(ctx, msg) {
const data = msg.data;
const type = data.type;
// Debounce count changes so we send at most 1 every 5 seconds
if (type === COUNT) {
const worker = (this.$workers || {})[ctx.getters.storeName];
if (worker) {
worker.countsUpdate(msg);
// No further processing - let the web worker debounce the counts
return;
}
}
// Web worker can process schemas to check that they are actually changing and
// only load updates if the schema did actually change
if (type === SCHEMA) {
const worker = (this.$workers || {})[ctx.getters.storeName];
if (worker) {
worker.updateSchema(data);
// No further processing - let the web worker check the schema updates
return;
}
}
queueChange(ctx, msg, true, 'Change');
const typeOption = ctx.rootGetters['type-map/optionsFor'](type);
if (typeOption?.alias?.length > 0) {
const alias = typeOption?.alias || [];
alias.map((type) => {
ctx.state.queue.push({
action: 'dispatch',
event: 'load',
body: {
...data,
type,
},
});
});
}
},
'ws.resource.remove'(ctx, msg) {
const data = msg.data;
const type = data.type;
if (type === SCHEMA) {
const worker = (this.$workers || {})[ctx.getters.storeName];
if (worker) {
worker.removeSchema(data.id);
}
}
queueChange(ctx, msg, false, 'Remove');
const typeOption = ctx.rootGetters['type-map/optionsFor'](type);
if (typeOption?.alias?.length > 0) {
const alias = typeOption?.alias || [];
alias.map((type) => {
const obj = ctx.getters.byId(type, data.id);
ctx.state.queue.push({
action: 'commit',
event: 'remove',
body: obj,
});
});
}
},
};
export const mutations = {
setSocket(state, socket) {
state.socket = socket;
},
setWantSocket(state, want) {
state.wantSocket = want;
},
enqueuePendingFrame(state, obj) {
state.pendingFrames.push(obj);
},
dequeuePendingFrame(state, obj) {
removeObject(state.pendingFrames, obj);
},
setWatchStarted(state, obj) {
const existing = state.started.find(entry => equivalentWatch(obj, entry));
if ( !existing ) {
addObject(state.started, obj);
}
delete state.inError[keyForSubscribe(obj)];
},
setWatchStopped(state, obj) {
const existing = state.started.find(entry => equivalentWatch(obj, entry));
if ( existing ) {
removeObject(state.started, existing);
} else {
console.warn("Tried to remove a watch that doesn't exist", obj); // eslint-disable-line no-console
}
},
setInError(state, msg) {
const key = keyForSubscribe(msg);
state.inError[key] = msg.reason;
},
clearInError(state, msg) {
const key = keyForSubscribe(msg);
delete state.inError[key];
},
debug(state, on) {
state.debugSocket = on !== false;
},
resetSubscriptions(state) {
clear(state.started);
clear(state.pendingFrames);
clear(state.queue);
clearInterval(state.queueTimer);
state.deferredRequests = {};
state.queueTimer = null;
}
};
export const getters = {
canWatch: state => (obj) => {
return !state.inError[keyForSubscribe(obj)];
},
watchStarted: state => (obj) => {
return !!state.started.find(entry => equivalentWatch(obj, entry));
},
nextResourceVersion: (state, getters) => (type, id) => {
type = normalizeType(type);
let revision = 0;
if ( id ) {
const existing = getters['byId'](type, id);
revision = parseInt(existing?.metadata?.resourceVersion, 10);
}
if ( !revision ) {
const cache = state.types[type];
if ( !cache ) {
return null;
}
revision = cache.revision;
for ( const obj of cache.list ) {
if ( obj && obj.metadata ) {
const neu = parseInt(obj.metadata.resourceVersion, 10);
revision = Math.max(revision, neu);
}
}
}
if ( revision ) {
return revision;
}
return null;
},
currentGeneration: state => (type) => {
type = normalizeType(type);
const cache = state.types[type];
if ( !cache ) {
return null;
}
return cache.generation;
},
};