diff --git a/app/app.go b/app/app.go index bd1a135..017535e 100644 --- a/app/app.go +++ b/app/app.go @@ -36,7 +36,7 @@ func Run() { cluster.Init(client) start := time.Now() log.Printf("[INFO] starting cluster sync\n") - err := cluster.Sync() + err := SyncCluster(&cluster) if err != nil { log.Printf("[Error] error encountered while syncing cluster: %s", err) } else { @@ -55,7 +55,7 @@ func Run() { case <-ticker.C: start := time.Now() log.Printf("[INFO] starting cluster sync\n") - err := cluster.Sync() + err := SyncCluster(&cluster) if err != nil { log.Printf("[ERR ] error encountered while syncing cluster: %s", err) } else { @@ -139,7 +139,7 @@ func Run() { router.POST("/sync", func(c *gin.Context) { start := time.Now() log.Printf("[INFO] starting cluster sync\n") - err := cluster.Sync() + err := SyncCluster(&cluster) if err != nil { log.Printf("[ERR ] failed to sync cluster: %s", err) c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) @@ -151,64 +151,33 @@ func Run() { }) router.POST("/nodes/:node/sync", func(c *gin.Context) { - nodeid := c.Param("node") + nodeName := c.Param("node") start := time.Now() - log.Printf("[INFO] starting %s sync\n", nodeid) - err := cluster.RebuildNode(nodeid) + log.Printf("[INFO] starting %s sync\n", nodeName) + err := SyncNode(&cluster, nodeName) if err != nil { - log.Printf("[ERR ] failed to sync %s: %s", nodeid, err.Error()) + log.Printf("[ERR ] failed to sync %s: %s", nodeName, err.Error()) c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) - } - - // after synchronizing a node, resync pool membership - err = cluster.ResolvePoolMembership() - if err != nil { - log.Printf("[ERR ] failed to sync %s: %s", nodeid, err.Error()) - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) } else { - log.Printf("[INFO] synced %s in %fs\n", nodeid, time.Since(start).Seconds()) + log.Printf("[INFO] synced %s in %fs\n", nodeName, time.Since(start).Seconds()) } }) router.POST("/nodes/:node/instances/:vmid/sync", func(c *gin.Context) { - nodeid := c.Param("node") + nodeName := c.Param("node") vmid, err := strconv.ParseUint(c.Param("vmid"), 10, 64) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%s could not be converted to vmid (uint)", c.Param("instance"))}) return } - - //go func() { start := time.Now() - log.Printf("[INFO] starting %s.%d sync\n", nodeid, vmid) - - node, err := cluster.GetNode(nodeid) + log.Printf("[INFO] starting %s.%d sync\n", nodeName, vmid) + err = SyncInstance(&cluster, nodeName, uint(vmid)) if err != nil { - log.Printf("[ERR ] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) + log.Printf("[ERR ] failed to sync %s.%d: %s", nodeName, vmid, err.Error()) c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) - return - } - - instance, err := node.GetInstance(uint(vmid)) - if err != nil { - log.Printf("[ERR ] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) - c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) - return - } - - err = node.RebuildInstance(instance.Type, uint(vmid)) - if err != nil { - log.Printf("[ERR ] failed to sync %s.%d: %s", nodeid, vmid, err.Error()) - c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) - } - - // after synchronizing a node, resync pool membership - err = cluster.ResolvePoolMembership() - if err != nil { - log.Printf("[ERR ] failed to sync %s: %s", nodeid, err.Error()) - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) } else { - log.Printf("[INFO] synced %s in %fs\n", nodeid, time.Since(start).Seconds()) + log.Printf("[INFO] synced %s in %fs\n", nodeName, time.Since(start).Seconds()) } }) diff --git a/app/model.go b/app/model.go index 95d625a..61fa5ef 100644 --- a/app/model.go +++ b/app/model.go @@ -27,12 +27,28 @@ func (cluster *Cluster) Get() (*Cluster, error) { } } +func SyncCluster(cluster *Cluster) error { + cluster.OK = false + + err := cluster.BuildCluster() + if err != nil { + return err + } + + err = cluster.ResolvePoolMembership() + if err != nil { + return err + } + + cluster.OK = true + return nil +} + // hard sync cluster -func (cluster *Cluster) Sync() error { +func (cluster *Cluster) BuildCluster() error { // aquire lock on cluster, release on return cluster.lock.Lock() - - cluster.OK = false + defer cluster.lock.Unlock() cluster.Nodes = make(map[string]*Node) @@ -46,15 +62,15 @@ func (cluster *Cluster) Sync() error { } // for each node: - for _, hostName := range nodes { + for _, nodeName := range nodes { wg.Go(func() error { start := time.Now() // rebuild node - err := cluster.RebuildNode(hostName) + err := cluster.BuildNode(nodeName) if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR ] error encountered while syncing node %s: %s", hostName, err) + log.Printf("[ERR ] error encountered while syncing node %s: %s", nodeName, err) } else { - log.Printf("[INFO] synced node %s in %d ms", hostName, time.Since(start).Milliseconds()) + log.Printf("[INFO] synced node %s in %d ms", nodeName, time.Since(start).Milliseconds()) } return err }) @@ -66,17 +82,6 @@ func (cluster *Cluster) Sync() error { return err } - cluster.lock.Unlock() - - err = cluster.ResolvePoolMembership() - if err != nil { - return err - } - - cluster.lock.Lock() - cluster.OK = true - cluster.lock.Unlock() - return nil } @@ -115,49 +120,66 @@ func (cluster *Cluster) ResolvePoolMembership() error { } // get a node in the cluster -func (cluster *Cluster) GetNode(hostName string) (*Node, error) { +func (cluster *Cluster) GetNode(nodeName string) (*Node, error) { // aquire cluster lock cluster.lock.Lock() defer cluster.lock.Unlock() - // get host - host, ok := cluster.Nodes[hostName] + // get node + node, ok := cluster.Nodes[nodeName] if !ok { - return nil, fmt.Errorf("%s not in cluster", hostName) + return nil, fmt.Errorf("%s not in cluster", nodeName) } else { - // aquire host lock to wait in case of a concurrent write - host.lock.Lock() - defer host.lock.Unlock() + // aquire node lock to wait in case of a concurrent write + node.lock.Lock() + defer node.lock.Unlock() - return host, nil + return node, nil } } +func SyncNode(cluster *Cluster, nodeName string) error { + cluster.OK = false + + err := cluster.BuildNode(nodeName) + if err != nil { + return err + } + + err = cluster.ResolvePoolMembership() + if err != nil { + return err + } + + cluster.OK = true + return nil +} + // hard sync node // returns error if the node could not be reached -func (cluster *Cluster) RebuildNode(hostName string) error { - host, err := cluster.pve.Node(hostName) - if err != nil && cluster.Nodes[hostName] == nil { // host is unreachable and did not exist previously +func (cluster *Cluster) BuildNode(nodeName string) error { + node, err := cluster.pve.Node(nodeName) + if err != nil && cluster.Nodes[nodeName] == nil { // node is unreachable and did not exist previously // return an error because we requested to sync a node that was not already in the cluster - return fmt.Errorf("error retrieving %s: %s", hostName, err.Error()) + return fmt.Errorf("error retrieving %s: %s", nodeName, err.Error()) } - // aquire lock on host, release on return - host.lock.Lock() - defer host.lock.Unlock() + // aquire lock on node, release on return + node.lock.Lock() + defer node.lock.Unlock() wg, _ := errgroup.WithContext(context.Background()) - if err != nil && cluster.Nodes[hostName] != nil { // host is unreachable and did exist previously + if err != nil && cluster.Nodes[nodeName] != nil { // node is unreachable and did exist previously // assume the node is down or gone and delete from cluster - delete(cluster.Nodes, hostName) + delete(cluster.Nodes, nodeName) return nil } - cluster.Nodes[hostName] = host + cluster.Nodes[nodeName] = node // get node's VMs - vms, err := host.VirtualMachines() + vms, err := node.VirtualMachines() if err != nil { return err @@ -165,29 +187,29 @@ func (cluster *Cluster) RebuildNode(hostName string) error { for _, vmid := range vms { wg.Go(func() error { start := time.Now() - err := host.RebuildInstance(VM, vmid) + err := node.BuildInstance(VM, vmid) if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR ] error encountered while syncing vm %s.%d: %s", hostName, vmid, err) + log.Printf("[ERR ] error encountered while syncing vm %s.%d: %s", nodeName, vmid, err) } else { - log.Printf("[INFO] synced vm %s.%d in %d ms", hostName, vmid, time.Since(start).Milliseconds()) + log.Printf("[INFO] synced vm %s.%d in %d ms", nodeName, vmid, time.Since(start).Milliseconds()) } return err }) } // get node's CTs - cts, err := host.Containers() + cts, err := node.Containers() if err != nil { return err } for _, vmid := range cts { wg.Go(func() error { start := time.Now() - err := host.RebuildInstance(CT, vmid) + err := node.BuildInstance(CT, vmid) if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR ] error encountered while syncing ct %s.%d: %s", hostName, vmid, err) + log.Printf("[ERR ] error encountered while syncing ct %s.%d: %s", nodeName, vmid, err) } else { - log.Printf("[INFO] synced ct %s.%d in %d ms", hostName, vmid, time.Since(start).Milliseconds()) + log.Printf("[INFO] synced ct %s.%d in %d ms", nodeName, vmid, time.Since(start).Milliseconds()) } return err @@ -200,7 +222,7 @@ func (cluster *Cluster) RebuildNode(hostName string) error { } // check node device reserved by iterating over each function, we will assume that a single reserved function means the device is also reserved - for _, device := range host.Devices { + for _, device := range node.Devices { reserved := false for _, function := range device.Functions { reserved = reserved || function.Reserved @@ -208,18 +230,20 @@ func (cluster *Cluster) RebuildNode(hostName string) error { device.Reserved = reserved } + node.cluster = cluster + return nil } -func (host *Node) GetInstance(vmid uint) (*Instance, error) { - // aquire host lock - host.lock.Lock() - defer host.lock.Unlock() +func (node *Node) GetInstance(vmid uint) (*Instance, error) { + // aquire node lock + node.lock.Lock() + defer node.lock.Unlock() // get instance - instance, ok := host.Instances[InstanceID(vmid)] + instance, ok := node.Instances[InstanceID(vmid)] if !ok { - return nil, fmt.Errorf("vmid %d not in host %s", vmid, host.Name) + return nil, fmt.Errorf("vmid %d not in node %s", vmid, node.Name) } else { // aquire instance lock to wait in case of a concurrent write instance.lock.Lock() @@ -229,23 +253,50 @@ func (host *Node) GetInstance(vmid uint) (*Instance, error) { } } +func SyncInstance(cluster *Cluster, nodeName string, vmid uint) error { + cluster.OK = false + + node, err := cluster.GetNode(nodeName) + if err != nil { + return err + } + + instance, err := node.GetInstance(uint(vmid)) + if err != nil { + return err + } + + err = node.BuildInstance(instance.Type, uint(vmid)) + if err != nil { + return err + } + + err = cluster.ResolvePoolMembership() + if err != nil { + return err + } + + cluster.OK = true + return nil +} + // hard sync instance // returns error if the instance could not be reached -func (host *Node) RebuildInstance(instancetype InstanceType, vmid uint) error { +func (node *Node) BuildInstance(instancetype InstanceType, vmid uint) error { instanceID := InstanceID(vmid) var instance *Instance var err error switch instancetype { case VM: - instance, err = host.VirtualMachine(vmid) + instance, err = node.VirtualMachine(vmid) case CT: - instance, err = host.Container(vmid) + instance, err = node.Container(vmid) } - if err != nil && host.Instances[instanceID] == nil { // instance is unreachable and did not exist previously + if err != nil && node.Instances[instanceID] == nil { // instance is unreachable and did not exist previously // return an error because we requested to sync an instance that was not already in the cluster - return fmt.Errorf("error retrieving %s.%d: %s", host.Name, instanceID, err.Error()) + return fmt.Errorf("error retrieving %s.%d: %s", node.Name, instanceID, err.Error()) } // aquire lock on instance, release on return @@ -254,17 +305,17 @@ func (host *Node) RebuildInstance(instancetype InstanceType, vmid uint) error { wg, _ := errgroup.WithContext(context.Background()) - if err != nil && host.Instances[instanceID] != nil { // host is unreachable and did exist previously + if err != nil && node.Instances[instanceID] != nil { // node is unreachable and did exist previously // assume the instance is gone and delete from cluster - delete(host.Instances, instanceID) + delete(node.Instances, instanceID) return nil } - host.Instances[instanceID] = instance + node.Instances[instanceID] = instance for volid := range instance.configDisks { wg.Go(func() error { - err = instance.RebuildVolume(host, volid) + err = instance.RebuildVolume(node, volid) if err != nil { log.Printf("[ERR ] error rebuilding volume %s: %s", volid, err) } @@ -274,7 +325,7 @@ func (host *Node) RebuildInstance(instancetype InstanceType, vmid uint) error { for netid := range instance.configNets { wg.Go(func() error { - err = instance.RebuildNet(host, netid) + err = instance.RebuildNet(node, netid) if err != nil { log.Printf("[ERR ] error rebuilding net %s: %s", netid, err) return err @@ -285,7 +336,7 @@ func (host *Node) RebuildInstance(instancetype InstanceType, vmid uint) error { for deviceid := range instance.configHostPCIs { wg.Go(func() error { - err = instance.RebuildDevice(host, deviceid) + err = instance.RebuildDevice(node, deviceid) if err != nil { log.Printf("[ERR ] error rebuilding pci %s: %s", deviceid, err) } @@ -299,20 +350,21 @@ func (host *Node) RebuildInstance(instancetype InstanceType, vmid uint) error { } if instance.Type == VM { - err = instance.RebuildBoot(host) + err = instance.RebuildBoot(node) if err != nil { log.Printf("[ERR ] error rebuilding boot: %s", err) } return err - } else { - return nil } + + instance.node = node + return nil } -func (instance *Instance) RebuildVolume(host *Node, volid string) error { +func (instance *Instance) RebuildVolume(node *Node, volid string) error { volumeDataString := instance.configDisks[volid] - volume, err := GetVolumeInfo(host, volumeDataString) + volume, err := GetVolumeInfo(node, volumeDataString) if err != nil { return err } @@ -325,7 +377,7 @@ func (instance *Instance) RebuildVolume(host *Node, volid string) error { return nil } -func (instance *Instance) RebuildNet(host *Node, netid string) error { +func (instance *Instance) RebuildNet(node *Node, netid string) error { net := instance.configNets[netid] netinfo, err := GetNetInfo(net) @@ -339,10 +391,10 @@ func (instance *Instance) RebuildNet(host *Node, netid string) error { return nil } -func (instance *Instance) RebuildDevice(host *Node, deviceid string) error { +func (instance *Instance) RebuildDevice(node *Node, deviceid string) error { instanceDevice, ok := instance.configHostPCIs[deviceid] if !ok { // if device does not exist - log.Printf("[WARN] %s not found in devices on node %s", deviceid, host.Name) + log.Printf("[WARN] %s not found in devices on node %s", deviceid, node.Name) return nil } @@ -350,7 +402,7 @@ func (instance *Instance) RebuildDevice(host *Node, deviceid string) error { instanceDeviceBusID := DeviceID(deviceid) if DeviceBusIDIsSuperDevice(hostDeviceBusID) { - instance.Devices[DeviceID(instanceDeviceBusID)] = host.Devices[DeviceBus(hostDeviceBusID)] + instance.Devices[DeviceID(instanceDeviceBusID)] = node.Devices[DeviceBus(hostDeviceBusID)] for _, function := range instance.Devices[DeviceID(instanceDeviceBusID)].Functions { function.Reserved = true } @@ -363,7 +415,7 @@ func (instance *Instance) RebuildDevice(host *Node, deviceid string) error { return nil } -func (instance *Instance) RebuildBoot(host *Node) error { +func (instance *Instance) RebuildBoot(node *Node) error { instance.Boot = BootOrder{} eligibleBoot := map[string]bool{} diff --git a/app/types.go b/app/types.go index beb91cb..86aeaaf 100644 --- a/app/types.go +++ b/app/types.go @@ -28,6 +28,7 @@ type Node struct { Instances map[InstanceID]*Instance `json:"instances"` pvenode *proxmox.Node storage map[string][]*proxmox.StorageContent + cluster *Cluster } type InstanceID = paas.InstanceID @@ -45,6 +46,7 @@ type Instance struct { configNets map[string]string configHostPCIs map[string]string configBoot string + node *Node } type VolumeID = paas.VolumeID