From d5fea5e7b848da8a0da5d166b8c7b077c33fa690 Mon Sep 17 00:00:00 2001 From: Arthur Lu Date: Sun, 9 Nov 2025 23:59:19 +0000 Subject: [PATCH] reimplement synchronization for sync methods --- app/app.go | 61 ++++++------- app/model.go | 254 ++++++++++++++++++++++++++++----------------------- app/types.go | 3 +- 3 files changed, 170 insertions(+), 148 deletions(-) diff --git a/app/app.go b/app/app.go index 9631413..3160d4d 100644 --- a/app/app.go +++ b/app/app.go @@ -25,7 +25,7 @@ func Run() { flag.Parse() config := GetConfig(*configPath) - log.Printf("[INF] initialized config from %s", *configPath) + log.Printf("[INFO] initialized config from %s", *configPath) token := fmt.Sprintf(`%s@%s!%s`, config.PVE.Token.USER, config.PVE.Token.REALM, config.PVE.Token.ID) client = NewClient(config.PVE.URL, token, config.PVE.Token.Secret) @@ -35,13 +35,13 @@ func Run() { cluster := Cluster{} cluster.Init(client) start := time.Now() - log.Printf("[INF] starting cluster sync\n") + log.Printf("[INFO] starting cluster sync\n") cluster.Sync() - log.Printf("[INF] synced cluster in %fs\n", time.Since(start).Seconds()) + log.Printf("[INFO] synced cluster in %fs\n", time.Since(start).Seconds()) // set repeating update for full rebuilds ticker := time.NewTicker(time.Duration(config.ReloadInterval) * time.Second) - log.Printf("[INF] initialized cluster sync interval of %ds", config.ReloadInterval) + log.Printf("[INFO] initialized cluster sync interval of %ds", config.ReloadInterval) channel := make(chan bool) go func() { for { @@ -50,9 +50,9 @@ func Run() { return case <-ticker.C: start := time.Now() - log.Printf("[INF] starting cluster sync\n") + log.Printf("[INFO] starting cluster sync\n") cluster.Sync() - log.Printf("[INF] synced cluster in %fs\n", time.Since(start).Seconds()) + log.Printf("[INFO] synced cluster in %fs\n", time.Since(start).Seconds()) } } }() @@ -66,6 +66,17 @@ func Run() { } }) + router.GET("/", func(c *gin.Context) { + v, err := cluster.Get() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } else { + c.JSON(http.StatusOK, gin.H{"cluster": v}) + return + } + }) + router.GET("/nodes/:node", func(c *gin.Context) { nodeid := c.Param("node") @@ -80,20 +91,6 @@ func Run() { } }) - router.GET("/nodes/:node/devices", func(c *gin.Context) { - nodeid := c.Param("node") - - node, err := cluster.GetNode(nodeid) - - if err != nil { - c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) - return - } else { - c.JSON(http.StatusOK, gin.H{"devices": node.Devices}) - return - } - }) - router.GET("/nodes/:node/instances/:vmid", func(c *gin.Context) { nodeid := c.Param("node") vmid, err := strconv.ParseUint(c.Param("vmid"), 10, 64) @@ -122,14 +119,14 @@ func Run() { router.POST("/sync", func(c *gin.Context) { //go func() { start := time.Now() - log.Printf("[INF] starting cluster sync\n") + log.Printf("[INFO] starting cluster sync\n") err := cluster.Sync() if err != nil { - log.Printf("[ERR] failed to sync cluster: %s", err) + log.Printf("[ERR ] failed to sync cluster: %s", err) c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } else { - log.Printf("[INF] synced cluster in %fs\n", time.Since(start).Seconds()) + log.Printf("[INFO] synced cluster in %fs\n", time.Since(start).Seconds()) return } //}() @@ -139,14 +136,14 @@ func Run() { nodeid := c.Param("node") //go func() { start := time.Now() - log.Printf("[INF] starting %s sync\n", nodeid) + log.Printf("[INFO] starting %s sync\n", nodeid) err := cluster.RebuildNode(nodeid) if err != nil { - log.Printf("[ERR] failed to sync %s: %s", nodeid, err.Error()) + log.Printf("[ERR ] failed to sync %s: %s", nodeid, err.Error()) c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } else { - log.Printf("[INF] synced %s in %fs\n", nodeid, time.Since(start).Seconds()) + log.Printf("[INFO] synced %s in %fs\n", nodeid, time.Since(start).Seconds()) return } //}() @@ -162,34 +159,34 @@ func Run() { //go func() { start := time.Now() - log.Printf("[INF] starting %s.%d sync\n", nodeid, vmid) + log.Printf("[INFO] starting %s.%d sync\n", nodeid, vmid) node, err := cluster.GetNode(nodeid) if err != nil { - log.Printf("[ERR] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) + log.Printf("[ERR ] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } instance, err := node.GetInstance(uint(vmid)) if err != nil { - log.Printf("[ERR] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) + log.Printf("[ERR ] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } err = node.RebuildInstance(instance.Type, uint(vmid)) if err != nil { - log.Printf("[ERR] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) + log.Printf("[ERR ] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } else { - log.Printf("[INF] synced %s.%d in %fs\n", nodeid, vmid, time.Since(start).Seconds()) + log.Printf("[INFO] synced %s.%d in %fs\n", nodeid, vmid, time.Since(start).Seconds()) return } //}() }) - log.Printf("[INF] starting API listening on 0.0.0.0:%d", config.ListenPort) + log.Printf("[INFO] starting API listening on 0.0.0.0:%d", config.ListenPort) router.Run("0.0.0.0:" + strconv.Itoa(config.ListenPort)) } diff --git a/app/model.go b/app/model.go index 91a1a32..4c14d80 100644 --- a/app/model.go +++ b/app/model.go @@ -10,31 +10,52 @@ func (cluster *Cluster) Init(pve ProxmoxClient) { cluster.pve = pve } +func (cluster *Cluster) Get() (*Cluster, error) { + cluster_ch := make(chan *Cluster) + err_ch := make(chan error) + + go func() { + // aquire cluster lock + cluster.lock.Lock() + defer cluster.lock.Unlock() + cluster_ch <- cluster + err_ch <- nil + }() + + return <-cluster_ch, <-err_ch +} + // hard sync cluster func (cluster *Cluster) Sync() error { - // aquire lock on cluster, release on return - cluster.lock.Lock() - defer cluster.lock.Unlock() + err_ch := make(chan error) - cluster.Nodes = make(map[string]*Node) + go func() { + // aquire lock on cluster, release on return + cluster.lock.Lock() + defer cluster.lock.Unlock() - // get all nodes - nodes, err := cluster.pve.Nodes() - if err != nil { - return err - } - // for each node: - for _, hostName := range nodes { - // rebuild node - err := cluster.RebuildNode(hostName) - if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR] %s", err) - } else { // otherwise log success - log.Printf("[INF] successfully synced node %s", hostName) + cluster.Nodes = make(map[string]*Node) + + // get all nodes + nodes, err := cluster.pve.Nodes() + if err != nil { + err_ch <- err + return } - } + // for each node: + for _, hostName := range nodes { + // rebuild node + err := cluster.RebuildNode(hostName) + if err != nil { // if an error was encountered, continue and log the error + log.Printf("[ERR ] %s", err) + } else { // otherwise log success + log.Printf("[INFO] successfully synced node %s", hostName) + } + } + err_ch <- nil + }() - return nil + return <-err_ch } // get a node in the cluster @@ -61,72 +82,74 @@ func (cluster *Cluster) GetNode(hostName string) (*Node, error) { } }() - host := <-host_ch - err := <-err_ch - - return host, err + return <-host_ch, <-err_ch } // hard sync node // returns error if the node could not be reached func (cluster *Cluster) RebuildNode(hostName string) error { - host, err := cluster.pve.Node(hostName) - if err != nil && cluster.Nodes[hostName] == nil { // host is unreachable and did not exist previously - // return an error because we requested to sync a node that was not already in the cluster - return fmt.Errorf("error retrieving %s: %s", hostName, err.Error()) - } + err_ch := make(chan error) - // aquire lock on host, release on return - host.lock.Lock() - defer host.lock.Unlock() - - if err != nil && cluster.Nodes[hostName] != nil { // host is unreachable and did exist previously - // assume the node is down or gone and delete from cluster - delete(cluster.Nodes, hostName) - return nil - } - - cluster.Nodes[hostName] = host - - // get node's VMs - vms, err := host.VirtualMachines() - if err != nil { - return err - - } - for _, vmid := range vms { - err := host.RebuildInstance(VM, vmid) - if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR] %s", err) - } else { - log.Printf("[INF] successfully synced vm %s.%d", hostName, vmid) + go func() { + host, err := cluster.pve.Node(hostName) + if err != nil && cluster.Nodes[hostName] == nil { // host is unreachable and did not exist previously + // return an error because we requested to sync a node that was not already in the cluster + err_ch <- fmt.Errorf("error retrieving %s: %s", hostName, err.Error()) } - } - // get node's CTs - cts, err := host.Containers() - if err != nil { - return err - } - for _, vmid := range cts { - err := host.RebuildInstance(CT, vmid) - if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR] %s", err) - } else { - log.Printf("[INF] successfully synced ct %s.%d", hostName, vmid) + // aquire lock on host, release on return + host.lock.Lock() + defer host.lock.Unlock() + + if err != nil && cluster.Nodes[hostName] != nil { // host is unreachable and did exist previously + // assume the node is down or gone and delete from cluster + delete(cluster.Nodes, hostName) + err_ch <- nil } - } - // check node device reserved by iterating over each function, we will assume that a single reserved function means the device is also reserved - for _, device := range host.Devices { - reserved := false - for _, function := range device.Functions { - reserved = reserved || function.Reserved + cluster.Nodes[hostName] = host + + // get node's VMs + vms, err := host.VirtualMachines() + if err != nil { + err_ch <- err + + } + for _, vmid := range vms { + err := host.RebuildInstance(VM, vmid) + if err != nil { // if an error was encountered, continue and log the error + log.Printf("[ERR ] %s", err) + } else { + log.Printf("[INFO] successfully synced vm %s.%d", hostName, vmid) + } } - device.Reserved = reserved - } - return nil + // get node's CTs + cts, err := host.Containers() + if err != nil { + err_ch <- err + } + for _, vmid := range cts { + err := host.RebuildInstance(CT, vmid) + if err != nil { // if an error was encountered, continue and log the error + log.Printf("[ERR ] %s", err) + } else { + log.Printf("[INFO] successfully synced ct %s.%d", hostName, vmid) + } + } + + // check node device reserved by iterating over each function, we will assume that a single reserved function means the device is also reserved + for _, device := range host.Devices { + reserved := false + for _, function := range device.Functions { + reserved = reserved || function.Reserved + } + device.Reserved = reserved + } + + err_ch <- nil + }() + return <-err_ch } func (host *Node) GetInstance(vmid uint) (*Instance, error) { @@ -152,58 +175,62 @@ func (host *Node) GetInstance(vmid uint) (*Instance, error) { } }() - instance := <-instance_ch - err := <-err_ch - return instance, err + return <-instance_ch, <-err_ch } // hard sync instance // returns error if the instance could not be reached func (host *Node) RebuildInstance(instancetype InstanceType, vmid uint) error { - instanceID := InstanceID(vmid) - var instance *Instance - var err error - if instancetype == VM { - instance, err = host.VirtualMachine(vmid) - } else if instancetype == CT { - instance, err = host.Container(vmid) + err_ch := make(chan error) - } + go func() { + instanceID := InstanceID(vmid) + var instance *Instance + var err error + if instancetype == VM { + instance, err = host.VirtualMachine(vmid) + } else if instancetype == CT { + instance, err = host.Container(vmid) - if err != nil && host.Instances[instanceID] == nil { // instance is unreachable and did not exist previously - // return an error because we requested to sync an instance that was not already in the cluster - return fmt.Errorf("error retrieving %d.%d: %s", host.Name, instanceID, err.Error()) - } + } - // aquire lock on instance, release on return - instance.lock.Lock() - defer instance.lock.Unlock() + if err != nil && host.Instances[instanceID] == nil { // instance is unreachable and did not exist previously + // return an error because we requested to sync an instance that was not already in the cluster + err_ch <- fmt.Errorf("error retrieving %s.%d: %s", host.Name, instanceID, err.Error()) + } - if err != nil && host.Instances[instanceID] != nil { // host is unreachable and did exist previously - // assume the instance is gone and delete from cluster - delete(host.Instances, instanceID) - return nil - } + // aquire lock on instance, release on return + instance.lock.Lock() + defer instance.lock.Unlock() - host.Instances[instanceID] = instance + if err != nil && host.Instances[instanceID] != nil { // host is unreachable and did exist previously + // assume the instance is gone and delete from cluster + delete(host.Instances, instanceID) + err_ch <- nil + } - for volid := range instance.configDisks { - instance.RebuildVolume(host, volid) - } + host.Instances[instanceID] = instance - for netid := range instance.configNets { - instance.RebuildNet(netid) - } + for volid := range instance.configDisks { + instance.RebuildVolume(host, volid) + } - for deviceid := range instance.configHostPCIs { - instance.RebuildDevice(host, deviceid) - } + for netid := range instance.configNets { + instance.RebuildNet(netid) + } - if instance.Type == VM { - instance.RebuildBoot() - } + for deviceid := range instance.configHostPCIs { + instance.RebuildDevice(host, deviceid) + } - return nil + if instance.Type == VM { + instance.RebuildBoot() + } + + err_ch <- nil + }() + + return <-err_ch } func (instance *Instance) RebuildVolume(host *Node, volid string) error { @@ -239,7 +266,7 @@ func (instance *Instance) RebuildNet(netid string) error { func (instance *Instance) RebuildDevice(host *Node, deviceid string) { instanceDevice, ok := instance.configHostPCIs[deviceid] if !ok { // if device does not exist - log.Printf("%s not found in devices", deviceid) + log.Printf("[WARN] %s not found in devices", deviceid) return } @@ -256,7 +283,6 @@ func (instance *Instance) RebuildDevice(host *Node, deviceid string) { } instance.Devices[DeviceID(instanceDeviceBusID)].Device_ID = DeviceID(deviceid) - instance.Devices[DeviceID(instanceDeviceBusID)].Value = instanceDevice } func (instance *Instance) RebuildBoot() { @@ -285,7 +311,7 @@ func (instance *Instance) RebuildBoot() { instance.Boot.Enabled = append(instance.Boot.Enabled, val) delete(eligibleBoot, bootTarget) } else { // item is not eligible for boot but is included in the boot order - log.Printf("Encountered enabled but non-eligible boot target %s in instance %s\n", bootTarget, instance.Name) + log.Printf("[WARN] encountered enabled but non-eligible boot target %s in instance %s\n", bootTarget, instance.Name) delete(eligibleBoot, bootTarget) } } @@ -297,7 +323,7 @@ func (instance *Instance) RebuildBoot() { } else if val, ok := instance.Nets[NetID(bootTarget)]; ok && isEligible { // if the item is eligible and is in nets instance.Boot.Disabled = append(instance.Boot.Disabled, val) } else { // item is not eligible and is not already in the boot order, skip adding to model - log.Printf("Encountered disabled and non-eligible boot target %s in instance %s\n", bootTarget, instance.Name) + log.Printf("[WARN] encountered disabled and non-eligible boot target %s in instance %s\n", bootTarget, instance.Name) } } } diff --git a/app/types.go b/app/types.go index 0573adf..5526155 100644 --- a/app/types.go +++ b/app/types.go @@ -9,7 +9,7 @@ import ( type Cluster struct { lock sync.Mutex pve ProxmoxClient - Nodes map[string]*Node + Nodes map[string]*Node `json:"nodes"` } type Node struct { @@ -88,7 +88,6 @@ type Device struct { Vendor_Name string `json:"vendor_name"` Functions map[FunctionID]*Function `json:"functions"` Reserved bool `json:"reserved"` - Value string } type FunctionID string