fix bug with socket close handling of poolUserMap,

improve socket init,
improve interrupt sync timing progression issues
This commit is contained in:
Arthur Lu 2023-07-17 19:41:20 +00:00
parent e0a121009d
commit 98f2c39e6e

View File

@ -17,8 +17,8 @@ let timer = null;
let prevState = {}; let prevState = {};
export function setupClientSync (app, server, options) { export function setupClientSync (app, server, options) {
let schemes = options.schemes; const schemes = options.schemes;
let resourceTypes = options.resourcetypes; const resourceTypes = options.resourcetypes;
/** /**
* GET - get list of supported synchronization schemes * GET - get list of supported synchronization schemes
* responses: * responses:
@ -56,7 +56,7 @@ export function setupClientSync (app, server, options) {
// setup interupt scheme // setup interupt scheme
if (schemes.interrupt.enabled) { if (schemes.interrupt.enabled) {
const wsServer = new WebSocketServer({ noServer: true, path: "/api/sync/interrupt" }); const wsServer = new WebSocketServer({ noServer: true, path: "/api/sync/interrupt" });
wsServer.on("connection", (socket, username) => { wsServer.on("connection", (socket, username, pool) => {
// add new socket to userSocketmap // add new socket to userSocketmap
if (userSocketMap[username]) { if (userSocketMap[username]) {
const index = Object.keys(userSocketMap[username]).length; const index = Object.keys(userSocketMap[username]).length;
@ -68,7 +68,6 @@ export function setupClientSync (app, server, options) {
socket.userIndex = 0; socket.userIndex = 0;
} }
// add user to associated pool in poolUserMap // add user to associated pool in poolUserMap
const pool = db.getUserConfig(username).cluster.pool;
if (poolUserMap[pool]) { if (poolUserMap[pool]) {
poolUserMap[pool][username] = true; poolUserMap[pool][username] = true;
} }
@ -84,17 +83,19 @@ export function setupClientSync (app, server, options) {
socket.on("error", console.error); socket.on("error", console.error);
// handle socket close // handle socket close
socket.on("close", () => { socket.on("close", () => {
// remove closed socket from userSocketMap // remove closed socket from userSocketMap user entry
delete userSocketMap[username][socket.userIndex]; delete userSocketMap[username][socket.userIndex];
// if there are no more sockets for this user, delete the userSocketMap user entry and the poolUserMap user entry
if (Object.keys(userSocketMap[username]).length === 0) { if (Object.keys(userSocketMap[username]).length === 0) {
// delete the user entry
delete userSocketMap[username]; delete userSocketMap[username];
} // remove user from poolUserMap pool entry
// remove user from poolUserMap
const pool = db.getUserConfig(username).cluster.pool;
delete poolUserMap[pool][username]; delete poolUserMap[pool][username];
// if the poolUserMap pool entry is empty, delete the entry
if (Object.keys(poolUserMap[pool]).length === 0) { if (Object.keys(poolUserMap[pool]).length === 0) {
delete poolUserMap[pool]; delete poolUserMap[pool];
} }
}
// remove socket entry from requestedRates // remove socket entry from requestedRates
delete requestedRates[socket.rateIndex]; delete requestedRates[socket.rateIndex];
if (Object.keys(requestedRates).length === 0) { // if there are no requested rates left, clear the timer if (Object.keys(requestedRates).length === 0) { // if there are no requested rates left, clear the timer
@ -148,11 +149,23 @@ export function setupClientSync (app, server, options) {
} }
else { else {
wsServer.handleUpgrade(req, socket, head, (socket) => { wsServer.handleUpgrade(req, socket, head, (socket) => {
wsServer.emit("connection", socket, cookies.username); const pool = db.getUserConfig(cookies.username).cluster.pool;
wsServer.emit("connection", socket, cookies.username, pool);
}); });
} }
}); });
const handleInterruptSync = async () => { const handleInterruptSync = async () => {
// queue timeout for next iteration with delay of minimum rate
const minRequestedRate = Math.min.apply(null, Object.values(requestedRates));
// if the minimum rate is not Infinity, schedule the next timer
if (minRequestedRate < Infinity) {
timer = setTimeout(handleInterruptSync, minRequestedRate);
}
// if the minimum rate is Infinity, then don't schedule anything and set timer to null
else {
timer = null;
return;
}
// get current cluster resources // get current cluster resources
const status = (await requestPVE("/cluster/resources", "GET", null, null, pveAPIToken)).data.data; const status = (await requestPVE("/cluster/resources", "GET", null, null, pveAPIToken)).data.data;
// filter out just state information of resources that are needed, and hash each one // filter out just state information of resources that are needed, and hash each one
@ -208,8 +221,6 @@ export function setupClientSync (app, server, options) {
} }
// set prevState for next iteration // set prevState for next iteration
prevState = currState; prevState = currState;
// queue timeout for next iteration with delay of minimum rate
timer = setTimeout(handleInterruptSync, Math.min.apply(null, Object.values(requestedRates)));
}; };
console.log("clientsync: enabled interrupt sync"); console.log("clientsync: enabled interrupt sync");
} }