diff --git a/app/app.go b/app/app.go index be6fc81..16fdb70 100644 --- a/app/app.go +++ b/app/app.go @@ -34,10 +34,14 @@ func Run() { cluster := Cluster{} cluster.Init(client) - cluster.Rebuild() + start := time.Now() + log.Printf("Starting cluster sync\n") + cluster.Sync() + log.Printf("Synced cluster in %fs\n", time.Since(start).Seconds()) // set repeating update for full rebuilds - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(time.Duration(config.ReloadInterval) * time.Second) + log.Printf("Initialized cluster sync interval of %ds", config.ReloadInterval) channel := make(chan bool) go func() { for { @@ -45,8 +49,10 @@ func Run() { case <-channel: return case <-ticker.C: - cluster.Rebuild() - log.Printf("rebuilt cluster\n") + start := time.Now() + log.Printf("Starting cluster sync\n") + cluster.Sync() + log.Printf("Synced cluster in %fs\n", time.Since(start).Seconds()) } } }() @@ -62,34 +68,38 @@ func Run() { router.GET("/nodes/:node", func(c *gin.Context) { node := c.Param("node") - Host, ok := cluster.Hosts[node] - if !ok { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%s not found in cluster", node)}) + + host, err := cluster.GetHost(node) + + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err}) return } else { - c.JSON(http.StatusOK, gin.H{"node": Host}) + c.JSON(http.StatusOK, gin.H{"node": host}) return } }) router.GET("/nodes/:node/instances/:instance", func(c *gin.Context) { - host := c.Param("node") + node := c.Param("node") vmid, err := strconv.ParseUint(c.Param("instance"), 10, 64) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%s could not be converted to vmid (uint)", c.Param("instance"))}) return } - Node, ok := cluster.Hosts[host] - if !ok { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("vmid %s not found in cluster", host)}) + + host, err := cluster.GetHost(node) + + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("vmid %s not found in cluster", node)}) return } else { - Instance, ok := Node.Instances[uint(vmid)] - if !ok { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%d not found in %s", vmid, host)}) + instance, err := host.GetInstance(uint(vmid)) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%d not found in %s", vmid, node)}) return } else { - c.JSON(http.StatusOK, gin.H{"instance": Instance}) + c.JSON(http.StatusOK, gin.H{"instance": instance}) return } } diff --git a/app/model.go b/app/model.go index 02839d8..2435a65 100644 --- a/app/model.go +++ b/app/model.go @@ -6,16 +6,15 @@ import ( "strings" ) -type Cluster struct { - pve ProxmoxClient - Hosts map[string]*Host -} - func (cluster *Cluster) Init(pve ProxmoxClient) { cluster.pve = pve } -func (cluster *Cluster) Rebuild() error { +func (cluster *Cluster) Sync() error { + // aquire lock on cluster, release on return + cluster.lock.Lock() + defer cluster.lock.Unlock() + cluster.Hosts = make(map[string]*Host) // get all nodes @@ -26,7 +25,7 @@ func (cluster *Cluster) Rebuild() error { // for each node: for _, hostName := range nodes { // rebuild node - err := cluster.RebuildNode(hostName) + err := cluster.RebuildHost(hostName) if err != nil { return err } @@ -35,17 +34,51 @@ func (cluster *Cluster) Rebuild() error { return nil } -func (cluster *Cluster) RebuildNode(hostName string) error { +// get a node in the cluster +func (cluster *Cluster) GetHost(hostName string) (*Host, error) { + host_ch := make(chan *Host) + err_ch := make(chan error) + + go func() { + // aquire cluster lock + cluster.lock.Lock() + defer cluster.lock.Unlock() + // get host + host, ok := cluster.Hosts[hostName] + if !ok { + host_ch <- nil + err_ch <- fmt.Errorf("%s not in cluster", hostName) + } + // aquire host lock to wait in case of a concurrent write + host.lock.Lock() + defer host.lock.Unlock() + + host_ch <- host + err_ch <- nil + }() + + host := <-host_ch + err := <-err_ch + return host, err +} + +func (cluster *Cluster) RebuildHost(hostName string) error { host, err := cluster.pve.Node(hostName) if err != nil { return err } - cluster.Hosts[hostName] = &host + + // aquire lock on host, release on return + host.lock.Lock() + defer host.lock.Unlock() + + cluster.Hosts[hostName] = host // get node's VMs vms, err := host.VirtualMachines() if err != nil { return err + } for _, vmid := range vms { err := host.RebuildVM(vmid) @@ -69,13 +102,44 @@ func (cluster *Cluster) RebuildNode(hostName string) error { return nil } +func (host *Host) GetInstance(vmid uint) (*Instance, error) { + instance_ch := make(chan *Instance) + err_ch := make(chan error) + + go func() { + // aquire host lock + host.lock.Lock() + defer host.lock.Unlock() + // get instance + instance, ok := host.Instances[vmid] + if !ok { + instance_ch <- nil + err_ch <- fmt.Errorf("vmid %d not in host %s", vmid, host.Name) + } + // aquire instance lock to wait in case of a concurrent write + instance.lock.Lock() + defer instance.lock.Unlock() + + instance_ch <- instance + err_ch <- nil + }() + + instance := <-instance_ch + err := <-err_ch + return instance, err +} + func (host *Host) RebuildVM(vmid uint) error { instance, err := host.VirtualMachine(vmid) if err != nil { return err } - host.Instances[vmid] = &instance + // aquire lock on instance, release on return + instance.lock.Lock() + defer instance.lock.Unlock() + + host.Instances[vmid] = instance for volid := range instance.configDisks { instance.RebuildVolume(host, volid) @@ -86,7 +150,7 @@ func (host *Host) RebuildVM(vmid uint) error { } for deviceid := range instance.configHostPCIs { - instance.RebuildDevice(*host, deviceid) + instance.RebuildDevice(host, deviceid) } return nil @@ -98,7 +162,11 @@ func (host *Host) RebuildCT(vmid uint) error { return err } - host.Instances[vmid] = &instance + // aquire lock on instance, release on return + instance.lock.Lock() + defer instance.lock.Unlock() + + host.Instances[vmid] = instance for volid := range instance.configDisks { instance.RebuildVolume(host, volid) @@ -114,12 +182,12 @@ func (host *Host) RebuildCT(vmid uint) error { func (instance *Instance) RebuildVolume(host *Host, volid string) error { volumeDataString := instance.configDisks[volid] - volume, _, _, err := GetVolumeInfo(*host, volumeDataString) + volume, _, _, err := GetVolumeInfo(host, volumeDataString) if err != nil { return err } - instance.Volumes[volid] = &volume + instance.Volumes[volid] = volume return nil } @@ -136,12 +204,12 @@ func (instance *Instance) RebuildNet(netid string) error { return nil } - instance.Nets[uint(idnum)] = &netinfo + instance.Nets[uint(idnum)] = netinfo return nil } -func (instance *Instance) RebuildDevice(host Host, deviceid string) error { +func (instance *Instance) RebuildDevice(host *Host, deviceid string) error { instanceDevice, ok := instance.configHostPCIs[deviceid] if !ok { // if device does not exist return fmt.Errorf("%s not found in devices", deviceid) diff --git a/app/proxmox.go b/app/proxmox.go index 687ea7a..6b45a38 100644 --- a/app/proxmox.go +++ b/app/proxmox.go @@ -57,20 +57,20 @@ func (pve ProxmoxClient) Nodes() ([]string, error) { } // Gets a Node's resources but does not recursively expand instances -func (pve ProxmoxClient) Node(nodeName string) (Host, error) { +func (pve ProxmoxClient) Node(nodeName string) (*Host, error) { host := Host{} host.Devices = make(map[string]*Device) host.Instances = make(map[uint]*Instance) node, err := pve.client.Node(context.Background(), nodeName) if err != nil { - return host, err + return &host, err } devices := []Device{} err = pve.client.Get(context.Background(), fmt.Sprintf("/nodes/%s/hardware/pci", nodeName), &devices) if err != nil { - return host, err + return &host, err } for _, device := range devices { @@ -81,14 +81,14 @@ func (pve ProxmoxClient) Node(nodeName string) (Host, error) { host.Cores.Total = uint64(node.CPUInfo.CPUs) host.Memory.Total = uint64(node.Memory.Total) host.Swap.Total = uint64(node.Swap.Total) - host.node = node + host.pvenode = node - return host, err + return &host, err } // Get all VM IDs on specified host -func (host Host) VirtualMachines() ([]uint, error) { - vms, err := host.node.VirtualMachines(context.Background()) +func (host *Host) VirtualMachines() ([]uint, error) { + vms, err := host.pvenode.VirtualMachines(context.Background()) if err != nil { return nil, err } @@ -100,11 +100,11 @@ func (host Host) VirtualMachines() ([]uint, error) { } // Get a VM's CPU, Memory but does not recursively link Devices, Disks, Drives, Nets -func (host Host) VirtualMachine(VMID uint) (Instance, error) { +func (host *Host) VirtualMachine(VMID uint) (*Instance, error) { instance := Instance{} - vm, err := host.node.VirtualMachine(context.Background(), int(VMID)) + vm, err := host.pvenode.VirtualMachine(context.Background(), int(VMID)) if err != nil { - return instance, err + return &instance, err } config := vm.VirtualMachineConfig @@ -112,7 +112,7 @@ func (host Host) VirtualMachine(VMID uint) (Instance, error) { instance.configNets = config.MergeNets() instance.configDisks = MergeVMDisksAndUnused(config) - instance.config = config + instance.pveconfig = config instance.Type = VM instance.Name = vm.Name @@ -123,7 +123,7 @@ func (host Host) VirtualMachine(VMID uint) (Instance, error) { instance.Nets = make(map[uint]*Net) instance.Devices = make(map[uint][]*Device) - return instance, nil + return &instance, nil } func MergeVMDisksAndUnused(vmc *proxmox.VirtualMachineConfig) map[string]string { @@ -135,8 +135,8 @@ func MergeVMDisksAndUnused(vmc *proxmox.VirtualMachineConfig) map[string]string } // Get all CT IDs on specified host -func (host Host) Containers() ([]uint, error) { - cts, err := host.node.Containers(context.Background()) +func (host *Host) Containers() ([]uint, error) { + cts, err := host.pvenode.Containers(context.Background()) if err != nil { return nil, err } @@ -148,11 +148,11 @@ func (host Host) Containers() ([]uint, error) { } // Get a CT's CPU, Memory, Swap but does not recursively link Devices, Disks, Drives, Nets -func (host Host) Container(VMID uint) (Instance, error) { +func (host *Host) Container(VMID uint) (*Instance, error) { instance := Instance{} - ct, err := host.node.Container(context.Background(), int(VMID)) + ct, err := host.pvenode.Container(context.Background(), int(VMID)) if err != nil { - return instance, err + return &instance, err } config := ct.ContainerConfig @@ -160,7 +160,7 @@ func (host Host) Container(VMID uint) (Instance, error) { instance.configNets = config.MergeNets() instance.configDisks = MergeCTDisksAndUnused(config) - instance.config = config + instance.pveconfig = config instance.Type = CT instance.Name = ct.Name @@ -170,7 +170,7 @@ func (host Host) Container(VMID uint) (Instance, error) { instance.Volumes = make(map[string]*Volume) instance.Nets = make(map[uint]*Net) - return instance, nil + return &instance, nil } func MergeCTDisksAndUnused(cc *proxmox.ContainerConfig) map[string]string { @@ -186,19 +186,19 @@ func MergeCTDisksAndUnused(cc *proxmox.ContainerConfig) map[string]string { } // get volume fornmat, size, volumeid, and storageid from instance volume data string (eg: local:100/vm-100-disk-0.raw ... ) -func GetVolumeInfo(host Host, volume string) (Volume, string, string, error) { +func GetVolumeInfo(host *Host, volume string) (*Volume, string, string, error) { volumeData := Volume{} storageID := strings.Split(volume, ":")[0] volumeID := strings.Split(volume, ",")[0] - storage, err := host.node.Storage(context.Background(), storageID) + storage, err := host.pvenode.Storage(context.Background(), storageID) if err != nil { - return volumeData, volumeID, storageID, nil + return &volumeData, volumeID, storageID, nil } content, err := storage.GetContent(context.Background()) if err != nil { - return volumeData, volumeID, storageID, nil + return &volumeData, volumeID, storageID, nil } for _, c := range content { @@ -209,27 +209,27 @@ func GetVolumeInfo(host Host, volume string) (Volume, string, string, error) { } } - return volumeData, volumeID, storageID, nil + return &volumeData, volumeID, storageID, nil } -func GetNetInfo(net string) (Net, error) { +func GetNetInfo(net string) (*Net, error) { n := Net{} for _, val := range strings.Split(net, ",") { if strings.HasPrefix(val, "rate=") { rate, err := strconv.ParseUint(strings.TrimPrefix(val, "rate="), 10, 64) if err != nil { - return n, err + return &n, err } n.Rate = rate } else if strings.HasPrefix(val, "tag=") { vlan, err := strconv.ParseUint(strings.TrimPrefix(val, "tag="), 10, 64) if err != nil { - return n, err + return &n, err } n.VLAN = vlan } } - return n, nil + return &n, nil } diff --git a/app/types.go b/app/types.go index fc6984b..a5b9cc8 100644 --- a/app/types.go +++ b/app/types.go @@ -1,21 +1,26 @@ package app -import "github.com/luthermonson/go-proxmox" +import ( + "sync" -type Resource struct { - Reserved uint64 - Free uint64 - Total uint64 + "github.com/luthermonson/go-proxmox" +) + +type Cluster struct { + lock sync.Mutex + pve ProxmoxClient + Hosts map[string]*Host } type Host struct { + lock sync.Mutex Name string Cores Resource Memory Resource Swap Resource Devices map[string]*Device Instances map[uint]*Instance - node *proxmox.Node + pvenode *proxmox.Node } type InstanceType string @@ -26,6 +31,7 @@ const ( ) type Instance struct { + lock sync.Mutex Type InstanceType Name string Proctype string @@ -35,11 +41,16 @@ type Instance struct { Volumes map[string]*Volume Nets map[uint]*Net Devices map[uint][]*Device - config interface{} + pveconfig interface{} configDisks map[string]string configNets map[string]string configHostPCIs map[string]string - proxmox.ContainerInterface +} + +type Resource struct { + Reserved uint64 + Free uint64 + Total uint64 } type Volume struct { diff --git a/app/utils.go b/app/utils.go index b98775a..1d966b6 100644 --- a/app/utils.go +++ b/app/utils.go @@ -19,6 +19,7 @@ type Config struct { Secret string `json:"uuid"` } } + ReloadInterval int `json:"reloadInterval"` } func GetConfig(configPath string) Config {