From b86034ae8ff0897315e516f195ec968ee92ed002 Mon Sep 17 00:00:00 2001
From: Arthur Lu <root@tronnet.net>
Date: Tue, 11 Feb 2025 07:11:05 +0000
Subject: [PATCH] add basic synchronization using mutex

---
 app/app.go     |  42 +++++++++++++--------
 app/model.go   | 100 +++++++++++++++++++++++++++++++++++++++++--------
 app/proxmox.go |  56 +++++++++++++--------------
 app/types.go   |  27 +++++++++----
 app/utils.go   |   1 +
 5 files changed, 158 insertions(+), 68 deletions(-)

diff --git a/app/app.go b/app/app.go
index be6fc81..16fdb70 100644
--- a/app/app.go
+++ b/app/app.go
@@ -34,10 +34,14 @@ func Run() {
 
 	cluster := Cluster{}
 	cluster.Init(client)
-	cluster.Rebuild()
+	start := time.Now()
+	log.Printf("Starting cluster sync\n")
+	cluster.Sync()
+	log.Printf("Synced cluster in %fs\n", time.Since(start).Seconds())
 
 	// set repeating update for full rebuilds
-	ticker := time.NewTicker(5 * time.Second)
+	ticker := time.NewTicker(time.Duration(config.ReloadInterval) * time.Second)
+	log.Printf("Initialized cluster sync interval of %ds", config.ReloadInterval)
 	channel := make(chan bool)
 	go func() {
 		for {
@@ -45,8 +49,10 @@ func Run() {
 			case <-channel:
 				return
 			case <-ticker.C:
-				cluster.Rebuild()
-				log.Printf("rebuilt cluster\n")
+				start := time.Now()
+				log.Printf("Starting cluster sync\n")
+				cluster.Sync()
+				log.Printf("Synced cluster in %fs\n", time.Since(start).Seconds())
 			}
 		}
 	}()
@@ -62,34 +68,38 @@ func Run() {
 
 	router.GET("/nodes/:node", func(c *gin.Context) {
 		node := c.Param("node")
-		Host, ok := cluster.Hosts[node]
-		if !ok {
-			c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%s not found in cluster", node)})
+
+		host, err := cluster.GetHost(node)
+
+		if err != nil {
+			c.JSON(http.StatusInternalServerError, gin.H{"error": err})
 			return
 		} else {
-			c.JSON(http.StatusOK, gin.H{"node": Host})
+			c.JSON(http.StatusOK, gin.H{"node": host})
 			return
 		}
 	})
 
 	router.GET("/nodes/:node/instances/:instance", func(c *gin.Context) {
-		host := c.Param("node")
+		node := c.Param("node")
 		vmid, err := strconv.ParseUint(c.Param("instance"), 10, 64)
 		if err != nil {
 			c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%s could not be converted to vmid (uint)", c.Param("instance"))})
 			return
 		}
-		Node, ok := cluster.Hosts[host]
-		if !ok {
-			c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("vmid %s not found in cluster", host)})
+
+		host, err := cluster.GetHost(node)
+
+		if err != nil {
+			c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("vmid %s not found in cluster", node)})
 			return
 		} else {
-			Instance, ok := Node.Instances[uint(vmid)]
-			if !ok {
-				c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%d not found in %s", vmid, host)})
+			instance, err := host.GetInstance(uint(vmid))
+			if err != nil {
+				c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%d not found in %s", vmid, node)})
 				return
 			} else {
-				c.JSON(http.StatusOK, gin.H{"instance": Instance})
+				c.JSON(http.StatusOK, gin.H{"instance": instance})
 				return
 			}
 		}
diff --git a/app/model.go b/app/model.go
index 02839d8..2435a65 100644
--- a/app/model.go
+++ b/app/model.go
@@ -6,16 +6,15 @@ import (
 	"strings"
 )
 
-type Cluster struct {
-	pve   ProxmoxClient
-	Hosts map[string]*Host
-}
-
 func (cluster *Cluster) Init(pve ProxmoxClient) {
 	cluster.pve = pve
 }
 
-func (cluster *Cluster) Rebuild() error {
+func (cluster *Cluster) Sync() error {
+	// aquire lock on cluster, release on return
+	cluster.lock.Lock()
+	defer cluster.lock.Unlock()
+
 	cluster.Hosts = make(map[string]*Host)
 
 	// get all nodes
@@ -26,7 +25,7 @@ func (cluster *Cluster) Rebuild() error {
 	// for each node:
 	for _, hostName := range nodes {
 		// rebuild node
-		err := cluster.RebuildNode(hostName)
+		err := cluster.RebuildHost(hostName)
 		if err != nil {
 			return err
 		}
@@ -35,17 +34,51 @@ func (cluster *Cluster) Rebuild() error {
 	return nil
 }
 
-func (cluster *Cluster) RebuildNode(hostName string) error {
+// get a node in the cluster
+func (cluster *Cluster) GetHost(hostName string) (*Host, error) {
+	host_ch := make(chan *Host)
+	err_ch := make(chan error)
+
+	go func() {
+		// aquire cluster lock
+		cluster.lock.Lock()
+		defer cluster.lock.Unlock()
+		// get host
+		host, ok := cluster.Hosts[hostName]
+		if !ok {
+			host_ch <- nil
+			err_ch <- fmt.Errorf("%s not in cluster", hostName)
+		}
+		// aquire host lock to wait in case of a concurrent write
+		host.lock.Lock()
+		defer host.lock.Unlock()
+
+		host_ch <- host
+		err_ch <- nil
+	}()
+
+	host := <-host_ch
+	err := <-err_ch
+	return host, err
+}
+
+func (cluster *Cluster) RebuildHost(hostName string) error {
 	host, err := cluster.pve.Node(hostName)
 	if err != nil {
 		return err
 	}
-	cluster.Hosts[hostName] = &host
+
+	// aquire lock on host, release on return
+	host.lock.Lock()
+	defer host.lock.Unlock()
+
+	cluster.Hosts[hostName] = host
 
 	// get node's VMs
 	vms, err := host.VirtualMachines()
 	if err != nil {
 		return err
+
 	}
 	for _, vmid := range vms {
 		err := host.RebuildVM(vmid)
@@ -69,13 +102,44 @@ func (cluster *Cluster) RebuildNode(hostName string) error {
 	return nil
 }
 
+func (host *Host) GetInstance(vmid uint) (*Instance, error) {
+	instance_ch := make(chan *Instance)
+	err_ch := make(chan error)
+
+	go func() {
+		// aquire host lock
+		host.lock.Lock()
+		defer host.lock.Unlock()
+		// get instance
+		instance, ok := host.Instances[vmid]
+		if !ok {
+			instance_ch <- nil
+			err_ch <- fmt.Errorf("vmid %d not in host %s", vmid, host.Name)
+		}
+		// aquire instance lock to wait in case of a concurrent write
+		instance.lock.Lock()
+		defer instance.lock.Unlock()
+
+		instance_ch <- instance
+		err_ch <- nil
+	}()
+
+	instance := <-instance_ch
+	err := <-err_ch
+	return instance, err
+}
+
 func (host *Host) RebuildVM(vmid uint) error {
 	instance, err := host.VirtualMachine(vmid)
 	if err != nil {
 		return err
 	}
 
-	host.Instances[vmid] = &instance
+	// aquire lock on instance, release on return
+	instance.lock.Lock()
+	defer instance.lock.Unlock()
+
+	host.Instances[vmid] = instance
 
 	for volid := range instance.configDisks {
 		instance.RebuildVolume(host, volid)
@@ -86,7 +150,7 @@ func (host *Host) RebuildVM(vmid uint) error {
 	}
 
 	for deviceid := range instance.configHostPCIs {
-		instance.RebuildDevice(*host, deviceid)
+		instance.RebuildDevice(host, deviceid)
 	}
 
 	return nil
@@ -98,7 +162,11 @@ func (host *Host) RebuildCT(vmid uint) error {
 		return err
 	}
 
-	host.Instances[vmid] = &instance
+	// aquire lock on instance, release on return
+	instance.lock.Lock()
+	defer instance.lock.Unlock()
+
+	host.Instances[vmid] = instance
 
 	for volid := range instance.configDisks {
 		instance.RebuildVolume(host, volid)
@@ -114,12 +182,12 @@ func (host *Host) RebuildCT(vmid uint) error {
 func (instance *Instance) RebuildVolume(host *Host, volid string) error {
 	volumeDataString := instance.configDisks[volid]
 
-	volume, _, _, err := GetVolumeInfo(*host, volumeDataString)
+	volume, _, _, err := GetVolumeInfo(host, volumeDataString)
 	if err != nil {
 		return err
 	}
 
-	instance.Volumes[volid] = &volume
+	instance.Volumes[volid] = volume
 
 	return nil
 }
@@ -136,12 +204,12 @@ func (instance *Instance) RebuildNet(netid string) error {
 		return nil
 	}
 
-	instance.Nets[uint(idnum)] = &netinfo
+	instance.Nets[uint(idnum)] = netinfo
 
 	return nil
 }
 
-func (instance *Instance) RebuildDevice(host Host, deviceid string) error {
+func (instance *Instance) RebuildDevice(host *Host, deviceid string) error {
 	instanceDevice, ok := instance.configHostPCIs[deviceid]
 	if !ok { // if device does not exist
 		return fmt.Errorf("%s not found in devices", deviceid)
diff --git a/app/proxmox.go b/app/proxmox.go
index 687ea7a..6b45a38 100644
--- a/app/proxmox.go
+++ b/app/proxmox.go
@@ -57,20 +57,20 @@ func (pve ProxmoxClient) Nodes() ([]string, error) {
 }
 
 // Gets a Node's resources but does not recursively expand instances
-func (pve ProxmoxClient) Node(nodeName string) (Host, error) {
+func (pve ProxmoxClient) Node(nodeName string) (*Host, error) {
 	host := Host{}
 	host.Devices = make(map[string]*Device)
 	host.Instances = make(map[uint]*Instance)
 
 	node, err := pve.client.Node(context.Background(), nodeName)
 	if err != nil {
-		return host, err
+		return &host, err
 	}
 
 	devices := []Device{}
 	err = pve.client.Get(context.Background(), fmt.Sprintf("/nodes/%s/hardware/pci", nodeName), &devices)
 	if err != nil {
-		return host, err
+		return &host, err
 	}
 
 	for _, device := range devices {
@@ -81,14 +81,14 @@ func (pve ProxmoxClient) Node(nodeName string) (Host, error) {
 	host.Cores.Total = uint64(node.CPUInfo.CPUs)
 	host.Memory.Total = uint64(node.Memory.Total)
 	host.Swap.Total = uint64(node.Swap.Total)
-	host.node = node
+	host.pvenode = node
 
-	return host, err
+	return &host, err
 }
 
 // Get all VM IDs on specified host
-func (host Host) VirtualMachines() ([]uint, error) {
-	vms, err := host.node.VirtualMachines(context.Background())
+func (host *Host) VirtualMachines() ([]uint, error) {
+	vms, err := host.pvenode.VirtualMachines(context.Background())
 	if err != nil {
 		return nil, err
 	}
@@ -100,11 +100,11 @@ func (host Host) VirtualMachines() ([]uint, error) {
 }
 
 // Get a VM's CPU, Memory but does not recursively link Devices, Disks, Drives, Nets
-func (host Host) VirtualMachine(VMID uint) (Instance, error) {
+func (host *Host) VirtualMachine(VMID uint) (*Instance, error) {
 	instance := Instance{}
-	vm, err := host.node.VirtualMachine(context.Background(), int(VMID))
+	vm, err := host.pvenode.VirtualMachine(context.Background(), int(VMID))
 	if err != nil {
-		return instance, err
+		return &instance, err
 	}
 
 	config := vm.VirtualMachineConfig
@@ -112,7 +112,7 @@ func (host Host) VirtualMachine(VMID uint) (Instance, error) {
 	instance.configNets = config.MergeNets()
 	instance.configDisks = MergeVMDisksAndUnused(config)
 
-	instance.config = config
+	instance.pveconfig = config
 	instance.Type = VM
 
 	instance.Name = vm.Name
@@ -123,7 +123,7 @@ func (host Host) VirtualMachine(VMID uint) (Instance, error) {
 	instance.Nets = make(map[uint]*Net)
 	instance.Devices = make(map[uint][]*Device)
 
-	return instance, nil
+	return &instance, nil
 }
 
 func MergeVMDisksAndUnused(vmc *proxmox.VirtualMachineConfig) map[string]string {
@@ -135,8 +135,8 @@ func MergeVMDisksAndUnused(vmc *proxmox.VirtualMachineConfig) map[string]string
 }
 
 // Get all CT IDs on specified host
-func (host Host) Containers() ([]uint, error) {
-	cts, err := host.node.Containers(context.Background())
+func (host *Host) Containers() ([]uint, error) {
+	cts, err := host.pvenode.Containers(context.Background())
 	if err != nil {
 		return nil, err
 	}
@@ -148,11 +148,11 @@ func (host Host) Containers() ([]uint, error) {
 }
 
 // Get a CT's CPU, Memory, Swap but does not recursively link Devices, Disks, Drives, Nets
-func (host Host) Container(VMID uint) (Instance, error) {
+func (host *Host) Container(VMID uint) (*Instance, error) {
 	instance := Instance{}
-	ct, err := host.node.Container(context.Background(), int(VMID))
+	ct, err := host.pvenode.Container(context.Background(), int(VMID))
 	if err != nil {
-		return instance, err
+		return &instance, err
 	}
 
 	config := ct.ContainerConfig
@@ -160,7 +160,7 @@ func (host Host) Container(VMID uint) (Instance, error) {
 	instance.configNets = config.MergeNets()
 	instance.configDisks = MergeCTDisksAndUnused(config)
 
-	instance.config = config
+	instance.pveconfig = config
 	instance.Type = CT
 
 	instance.Name = ct.Name
@@ -170,7 +170,7 @@ func (host Host) Container(VMID uint) (Instance, error) {
 	instance.Volumes = make(map[string]*Volume)
 	instance.Nets = make(map[uint]*Net)
 
-	return instance, nil
+	return &instance, nil
 }
 
 func MergeCTDisksAndUnused(cc *proxmox.ContainerConfig) map[string]string {
@@ -186,19 +186,19 @@ func MergeCTDisksAndUnused(cc *proxmox.ContainerConfig) map[string]string {
 }
 
 // get volume fornmat, size, volumeid, and storageid from instance volume data string (eg: local:100/vm-100-disk-0.raw ... )
-func GetVolumeInfo(host Host, volume string) (Volume, string, string, error) {
+func GetVolumeInfo(host *Host, volume string) (*Volume, string, string, error) {
 	volumeData := Volume{}
 
 	storageID := strings.Split(volume, ":")[0]
 	volumeID := strings.Split(volume, ",")[0]
-	storage, err := host.node.Storage(context.Background(), storageID)
+	storage, err := host.pvenode.Storage(context.Background(), storageID)
 	if err != nil {
-		return volumeData, volumeID, storageID, nil
+		return &volumeData, volumeID, storageID, nil
 	}
 
 	content, err := storage.GetContent(context.Background())
 	if err != nil {
-		return volumeData, volumeID, storageID, nil
+		return &volumeData, volumeID, storageID, nil
 	}
 
 	for _, c := range content {
@@ -209,27 +209,27 @@ func GetVolumeInfo(host Host, volume string) (Volume, string, string, error) {
 		}
 	}
 
-	return volumeData, volumeID, storageID, nil
+	return &volumeData, volumeID, storageID, nil
 }
 
-func GetNetInfo(net string) (Net, error) {
+func GetNetInfo(net string) (*Net, error) {
 	n := Net{}
 
 	for _, val := range strings.Split(net, ",") {
 		if strings.HasPrefix(val, "rate=") {
 			rate, err := strconv.ParseUint(strings.TrimPrefix(val, "rate="), 10, 64)
 			if err != nil {
-				return n, err
+				return &n, err
 			}
 			n.Rate = rate
 		} else if strings.HasPrefix(val, "tag=") {
 			vlan, err := strconv.ParseUint(strings.TrimPrefix(val, "tag="), 10, 64)
 			if err != nil {
-				return n, err
+				return &n, err
 			}
 			n.VLAN = vlan
 		}
 	}
 
-	return n, nil
+	return &n, nil
 }
diff --git a/app/types.go b/app/types.go
index fc6984b..a5b9cc8 100644
--- a/app/types.go
+++ b/app/types.go
@@ -1,21 +1,26 @@
 package app
 
-import "github.com/luthermonson/go-proxmox"
+import (
+	"sync"
 
-type Resource struct {
-	Reserved uint64
-	Free     uint64
-	Total    uint64
+	"github.com/luthermonson/go-proxmox"
+)
+
+type Cluster struct {
+	lock  sync.Mutex
+	pve   ProxmoxClient
+	Hosts map[string]*Host
 }
 
 type Host struct {
+	lock      sync.Mutex
 	Name      string
 	Cores     Resource
 	Memory    Resource
 	Swap      Resource
 	Devices   map[string]*Device
 	Instances map[uint]*Instance
-	node      *proxmox.Node
+	pvenode   *proxmox.Node
 }
 
 type InstanceType string
@@ -26,6 +31,7 @@ const (
 )
 
 type Instance struct {
+	lock           sync.Mutex
 	Type           InstanceType
 	Name           string
 	Proctype       string
@@ -35,11 +41,16 @@ type Instance struct {
 	Volumes        map[string]*Volume
 	Nets           map[uint]*Net
 	Devices        map[uint][]*Device
-	config         interface{}
+	pveconfig      interface{}
 	configDisks    map[string]string
 	configNets     map[string]string
 	configHostPCIs map[string]string
-	proxmox.ContainerInterface
+}
+
+type Resource struct {
+	Reserved uint64
+	Free     uint64
+	Total    uint64
 }
 
 type Volume struct {
diff --git a/app/utils.go b/app/utils.go
index b98775a..1d966b6 100644
--- a/app/utils.go
+++ b/app/utils.go
@@ -19,6 +19,7 @@ type Config struct {
 			Secret string `json:"uuid"`
 		}
 	}
+	ReloadInterval int `json:"reloadInterval"`
 }
 
 func GetConfig(configPath string) Config {