From e869834eb31d450ff7f9bd444f7491c251216606 Mon Sep 17 00:00:00 2001 From: Arthur Lu Date: Fri, 14 Jul 2023 21:48:06 +0000 Subject: [PATCH] implement interrupt sync --- README.md | 1 + config/template.localdb.json | 21 +++- src/clientsync.js | 198 ++++++++++++++++++++++++++++++++--- src/main.js | 3 +- src/utils.js | 4 + 5 files changed, 207 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 0a90bf2..100000d 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ In Proxmox VE, follow the following steps: - VM.* except VM.Audit, VM.Backup, VM.Clone, VM.Console, VM.Monitor, VM.PowerMgmt, VM.Snapshot, VM.Snapshot.Rollback - Datastore.Allocate, Datastore.AllocateSpace, Datastore.Audit - User.Modify + - Pool.Audit 4. Add a new API Token Permission with path: `/`, select the API token created previously, and role: `proxmoxaas-api` 5. Add a new User Permission with path: `/`, select the `proxmoxaas-api` user, and role: `proxmoxaas-api` diff --git a/config/template.localdb.json b/config/template.localdb.json index c2f801a..8debdf9 100644 --- a/config/template.localdb.json +++ b/config/template.localdb.json @@ -89,9 +89,24 @@ } }, "clientsync": { - "always": true, - "hash": true, - "interrupt": true + "resourcetypes": [ + "lxc", + "qemu", + "node" + ], + "schemes": { + "always": { + "enabled": true + }, + "hash": { + "enabled": true + }, + "interrupt": { + "min-rate": 1, + "max-rate": 60, + "enabled": true + } + } } }, "users": { diff --git a/src/clientsync.js b/src/clientsync.js index 2f9acfe..486c1cd 100644 --- a/src/clientsync.js +++ b/src/clientsync.js @@ -2,14 +2,23 @@ import { WebSocketServer } from "ws"; import * as cookie from "cookie"; import { requestPVE } from "./pve.js"; -import { checkAuth, getObjectHash } from "./utils.js"; +import { checkAuth, getObjectHash, getTimeLeft } from "./utils.js"; +import { db, pveAPIToken } from "./db.js"; // maps usernames to socket object(s) const userSocketMap = {}; -// maps proxmox resource ids to user(s) who can access the resource -const resourceUserMap = {}; +// maps pool ids to users +const poolUserMap = {}; +// maps sockets to their requested rates +const requestedRates = {}; +// stores the next queued interrupt handler +let timer = null; +// previous cluster state for interrupt handler +let prevState = {}; -export function setupClientSync (app, server, schemes) { +export function setupClientSync (app, server, options) { + let schemes = options.schemes; + let resourceTypes = options.resourcetypes; /** * GET - get list of supported synchronization schemes * responses: @@ -18,8 +27,11 @@ export function setupClientSync (app, server, schemes) { app.get("/api/sync/schemes", async (req, res) => { res.send(schemes); }); - - if (schemes.hash) { + if (schemes.always.enabled) { + console.log("clientsync: enabled always sync"); + } + // setup hash scheme + if (schemes.hash.enabled) { /** * GET - get hash of current cluster resources states * Client can use this endpoint to check for cluster state changes to avoid costly data transfers to the client. @@ -36,24 +48,98 @@ export function setupClientSync (app, server, schemes) { // get current cluster resources const status = (await requestPVE("/cluster/resources", "GET", req.cookies)).data.data; // filter out just state information of resources that are needed - const resources = ["lxc", "qemu", "node"]; - const state = {}; - status.forEach((element) => { - if (resources.includes(element.type)) { - state[element.id] = element.status; - } - }); + const state = extractClusterState(status, resourceTypes); res.status(200).send(getObjectHash(state)); }); + console.log("clientsync: enabled hash sync"); } - - if (schemes.interrupt) { + // setup interupt scheme + if (schemes.interrupt.enabled) { const wsServer = new WebSocketServer({ noServer: true, path: "/api/sync/interrupt" }); wsServer.on("connection", (socket, username) => { + // add new socket to userSocketmap + if (userSocketMap[username]) { + const index = Object.keys(userSocketMap[username]).length; + userSocketMap[username][index] = socket; + socket.userIndex = index; + } + else { + userSocketMap[username] = { 0: socket }; + socket.userIndex = 0; + } + // add user to associated pool in poolUserMap + const pool = db.getUserConfig(username).cluster.pool; + if (poolUserMap[pool]) { + poolUserMap[pool][username] = true; + } + else { + poolUserMap[pool] = {}; + poolUserMap[pool][username] = true; + } + // add socket entry into requestedRates + const index = Object.keys(requestedRates).length; + requestedRates[index] = Infinity; + socket.rateIndex = index; + // handle socket error + socket.on("error", console.error); + // handle socket close + socket.on("close", () => { + // remove closed socket from userSocketMap + delete userSocketMap[username][socket.userIndex]; + if (Object.keys(userSocketMap[username]).length === 0) { + delete userSocketMap[username]; + } + // remove user from poolUserMap + const pool = db.getUserConfig(username).cluster.pool; + delete poolUserMap[pool][username]; + if (Object.keys(poolUserMap[pool]).length === 0) { + delete poolUserMap[pool]; + } + // remove socket entry from requestedRates + delete requestedRates[socket.rateIndex]; + if (Object.keys(requestedRates).length === 0) { // if there are no requested rates left, clear the timer + clearTimeout(timer); + timer = null; + } + // terminate socket + socket.terminate(); + }); + // handle socket incoming message socket.on("message", (message) => { - console.log(message.toString()); + const parsed = message.toString().split(" "); + const cmd = parsed[0]; + // command is rate and the value is valid + if (cmd === "rate" && parsed[1] >= schemes.interrupt["min-rate"] && parsed[1] <= schemes.interrupt["max-rate"]) { + // get requested rate in ms + const rate = Number(parsed[1]) * 1000; + // if timer has not started, start it with requested rate + if (!timer) { + timer = setTimeout(handleInterruptSync, rate); + } + // otherwise, if the timer has started but the rate is lower than the current minimum + // AND if the next event trigger is more than the new rate in the future, + // restart the timer with the new rate + // avoids a large requested rate preventing a faster rate from being fulfilled + else if (rate < Math.min.apply(null, Object.values(requestedRates)) && getTimeLeft(timer) > rate) { + clearTimeout(timer); + timer = setTimeout(handleInterruptSync, rate); + } + // otherwise just add the rate to the list, when the next even trigger happens it will be requeued with the new requested rates + requestedRates[socket.rateIndex] = rate; + } + // command is rate but the requested value is out of bounds, terminate socket + else if (cmd === "rate") { + socket.send(`error: rate must be in range [${schemes.interrupt["min-rate"]}, ${schemes.interrupt["max-rate"]}].`); + socket.terminate(); + } + // otherwise, command is invalid, terminate socket + else { + socket.send(`error: ${cmd} command not found.`); + socket.terminate(); + } }); }); + // handle the wss upgrade request server.on("upgrade", async (req, socket, head) => { const cookies = cookie.parse(req.headers.cookie || ""); const auth = (await requestPVE("/version", "GET", cookies)).status === 200; @@ -66,5 +152,85 @@ export function setupClientSync (app, server, schemes) { }); } }); + const handleInterruptSync = async () => { + // get current cluster resources + const status = (await requestPVE("/cluster/resources", "GET", null, null, pveAPIToken)).data.data; + // filter out just state information of resources that are needed, and hash each one + const currState = extractClusterState(status, resourceTypes, true); + // get a map of users to send sync notifications + const syncUsers = {}; + // for each current resource in the cluster, check for state changes + Object.keys(currState).forEach((resource) => { + // if the resource's current state has changed, add all relevant users to syncUsers + const resourceCurrState = currState[resource]; + const resourcePrevState = prevState[resource]; + // if the previous state did not exist, or the status or pool have changed, then a resource state was added or modified + if (!resourcePrevState || resourceCurrState.hash !== resourcePrevState.hash) { + // if the resource is a node, send sync to all users + if (resourceCurrState.type === "node") { + Object.keys(userSocketMap).forEach((user) => { + syncUsers[user] = true; + }); + } + // if the resource is qemu or lxc, send sync to users in the same pool if there is a pool and if the pool has users + else if (resourceCurrState.pool && poolUserMap[resourceCurrState.pool]) { + Object.keys(poolUserMap[resourceCurrState.pool]).forEach((user) => { + syncUsers[user] = true; + }); + } + } + }); + // for each previous resource in the cluster, check for state changes + Object.keys(prevState).forEach((resource) => { + const resourceCurrState = currState[resource]; + const resourcePrevState = prevState[resource]; + // if the resource no longer exists in the current state, then it is lost or deleted + if (!resourceCurrState) { + // if the resource is a node, send sync to all users + if (resourcePrevState.type === "node") { + Object.keys(userSocketMap).forEach((user) => { + syncUsers[user] = true; + }); + } + // if the resource is qemu or lxc, send sync to users in the same pool if there is a pool and if the pool has users + else if (resourcePrevState.pool && poolUserMap[resourcePrevState.pool]) { + Object.keys(poolUserMap[resourcePrevState.pool]).forEach((user) => { + syncUsers[user] = true; + }); + } + } + }); + // for each user in syncUsers, send a sync message over their registered sockets + for (const user of Object.keys(syncUsers)) { + for (const socket of Object.keys(userSocketMap[user])) { + userSocketMap[user][socket].send("sync"); + } + } + // set prevState for next iteration + prevState = currState; + // queue timeout for next iteration with delay of minimum rate + timer = setTimeout(handleInterruptSync, Math.min.apply(null, Object.values(requestedRates))); + }; + console.log("clientsync: enabled interrupt sync"); } } + +function extractClusterState (status, resourceTypes, hashIndividual = false) { + const state = {}; + status.forEach((resource) => { + if (resourceTypes.includes(resource.type)) { + state[resource.id] = { + name: resource.name || null, + type: resource.type, + status: resource.status, + node: resource.node, + pool: resource.pool || null + }; + if (hashIndividual) { + const hash = getObjectHash(state[resource.id]); + state[resource.id].hash = hash; + } + } + }); + return state; +} diff --git a/src/main.js b/src/main.js index c9955f3..bbe2296 100644 --- a/src/main.js +++ b/src/main.js @@ -1197,7 +1197,8 @@ app.delete(`/api/:node(${nodeRegexP})/:type(${typeRegexP})/:vmid(${vmidRegexP})/ await handleResponse(params.node, result, res); }); +setupClientSync(app, server, db.getGlobalConfig().clientsync); + const server = app.listen(listenPort, () => { console.log(`proxmoxaas-api v${api.version} listening on port ${listenPort}`); }); -setupClientSync(app, server, db.getGlobalConfig().clientsync); diff --git a/src/utils.js b/src/utils.js index a1fdd41..70edfdb 100644 --- a/src/utils.js +++ b/src/utils.js @@ -99,3 +99,7 @@ export function getObjectHash (object, alg = "sha256", format = "hex") { hash.update(JSON.stringify(object, Object.keys(object).sort())); return hash.digest(format); } + +export function getTimeLeft (timeout) { + return Math.ceil((timeout._idleStart + timeout._idleTimeout - (global.process.uptime() * 1000))); +}