add basic synchronization using mutex

This commit is contained in:
Arthur Lu 2025-01-20 21:42:13 +00:00
parent 229ab36ede
commit 58cf403d26
5 changed files with 158 additions and 68 deletions

View File

@ -34,10 +34,14 @@ func Run() {
cluster := Cluster{} cluster := Cluster{}
cluster.Init(client) cluster.Init(client)
cluster.Rebuild() start := time.Now()
log.Printf("Starting cluster sync\n")
cluster.Sync()
log.Printf("Synced cluster in %fs\n", time.Since(start).Seconds())
// set repeating update for full rebuilds // set repeating update for full rebuilds
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(time.Duration(config.ReloadInterval) * time.Second)
log.Printf("Initialized cluster sync interval of %ds", config.ReloadInterval)
channel := make(chan bool) channel := make(chan bool)
go func() { go func() {
for { for {
@ -45,8 +49,10 @@ func Run() {
case <-channel: case <-channel:
return return
case <-ticker.C: case <-ticker.C:
cluster.Rebuild() start := time.Now()
log.Printf("rebuilt cluster\n") log.Printf("Starting cluster sync\n")
cluster.Sync()
log.Printf("Synced cluster in %fs\n", time.Since(start).Seconds())
} }
} }
}() }()
@ -62,34 +68,38 @@ func Run() {
router.GET("/nodes/:node", func(c *gin.Context) { router.GET("/nodes/:node", func(c *gin.Context) {
node := c.Param("node") node := c.Param("node")
Host, ok := cluster.Hosts[node]
if !ok { host, err := cluster.GetHost(node)
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%s not found in cluster", node)})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
return return
} else { } else {
c.JSON(http.StatusOK, gin.H{"node": Host}) c.JSON(http.StatusOK, gin.H{"node": host})
return return
} }
}) })
router.GET("/nodes/:node/instances/:instance", func(c *gin.Context) { router.GET("/nodes/:node/instances/:instance", func(c *gin.Context) {
host := c.Param("node") node := c.Param("node")
vmid, err := strconv.ParseUint(c.Param("instance"), 10, 64) vmid, err := strconv.ParseUint(c.Param("instance"), 10, 64)
if err != nil { if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%s could not be converted to vmid (uint)", c.Param("instance"))}) c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%s could not be converted to vmid (uint)", c.Param("instance"))})
return return
} }
Node, ok := cluster.Hosts[host]
if !ok { host, err := cluster.GetHost(node)
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("vmid %s not found in cluster", host)})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("vmid %s not found in cluster", node)})
return return
} else { } else {
Instance, ok := Node.Instances[uint(vmid)] instance, err := host.GetInstance(uint(vmid))
if !ok { if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%d not found in %s", vmid, host)}) c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("%d not found in %s", vmid, node)})
return return
} else { } else {
c.JSON(http.StatusOK, gin.H{"instance": Instance}) c.JSON(http.StatusOK, gin.H{"instance": instance})
return return
} }
} }

View File

@ -6,16 +6,15 @@ import (
"strings" "strings"
) )
type Cluster struct {
pve ProxmoxClient
Hosts map[string]*Host
}
func (cluster *Cluster) Init(pve ProxmoxClient) { func (cluster *Cluster) Init(pve ProxmoxClient) {
cluster.pve = pve cluster.pve = pve
} }
func (cluster *Cluster) Rebuild() error { func (cluster *Cluster) Sync() error {
// aquire lock on cluster, release on return
cluster.lock.Lock()
defer cluster.lock.Unlock()
cluster.Hosts = make(map[string]*Host) cluster.Hosts = make(map[string]*Host)
// get all nodes // get all nodes
@ -26,7 +25,7 @@ func (cluster *Cluster) Rebuild() error {
// for each node: // for each node:
for _, hostName := range nodes { for _, hostName := range nodes {
// rebuild node // rebuild node
err := cluster.RebuildNode(hostName) err := cluster.RebuildHost(hostName)
if err != nil { if err != nil {
return err return err
} }
@ -35,17 +34,51 @@ func (cluster *Cluster) Rebuild() error {
return nil return nil
} }
func (cluster *Cluster) RebuildNode(hostName string) error { // get a node in the cluster
func (cluster *Cluster) GetHost(hostName string) (*Host, error) {
host_ch := make(chan *Host)
err_ch := make(chan error)
go func() {
// aquire cluster lock
cluster.lock.Lock()
defer cluster.lock.Unlock()
// get host
host, ok := cluster.Hosts[hostName]
if !ok {
host_ch <- nil
err_ch <- fmt.Errorf("%s not in cluster", hostName)
}
// aquire host lock to wait in case of a concurrent write
host.lock.Lock()
defer host.lock.Unlock()
host_ch <- host
err_ch <- nil
}()
host := <-host_ch
err := <-err_ch
return host, err
}
func (cluster *Cluster) RebuildHost(hostName string) error {
host, err := cluster.pve.Node(hostName) host, err := cluster.pve.Node(hostName)
if err != nil { if err != nil {
return err return err
} }
cluster.Hosts[hostName] = &host
// aquire lock on host, release on return
host.lock.Lock()
defer host.lock.Unlock()
cluster.Hosts[hostName] = host
// get node's VMs // get node's VMs
vms, err := host.VirtualMachines() vms, err := host.VirtualMachines()
if err != nil { if err != nil {
return err return err
} }
for _, vmid := range vms { for _, vmid := range vms {
err := host.RebuildVM(vmid) err := host.RebuildVM(vmid)
@ -69,13 +102,44 @@ func (cluster *Cluster) RebuildNode(hostName string) error {
return nil return nil
} }
func (host *Host) GetInstance(vmid uint) (*Instance, error) {
instance_ch := make(chan *Instance)
err_ch := make(chan error)
go func() {
// aquire host lock
host.lock.Lock()
defer host.lock.Unlock()
// get instance
instance, ok := host.Instances[vmid]
if !ok {
instance_ch <- nil
err_ch <- fmt.Errorf("vmid %d not in host %s", vmid, host.Name)
}
// aquire instance lock to wait in case of a concurrent write
instance.lock.Lock()
defer instance.lock.Unlock()
instance_ch <- instance
err_ch <- nil
}()
instance := <-instance_ch
err := <-err_ch
return instance, err
}
func (host *Host) RebuildVM(vmid uint) error { func (host *Host) RebuildVM(vmid uint) error {
instance, err := host.VirtualMachine(vmid) instance, err := host.VirtualMachine(vmid)
if err != nil { if err != nil {
return err return err
} }
host.Instances[vmid] = &instance // aquire lock on instance, release on return
instance.lock.Lock()
defer instance.lock.Unlock()
host.Instances[vmid] = instance
for volid := range instance.configDisks { for volid := range instance.configDisks {
instance.RebuildVolume(host, volid) instance.RebuildVolume(host, volid)
@ -86,7 +150,7 @@ func (host *Host) RebuildVM(vmid uint) error {
} }
for deviceid := range instance.configHostPCIs { for deviceid := range instance.configHostPCIs {
instance.RebuildDevice(*host, deviceid) instance.RebuildDevice(host, deviceid)
} }
return nil return nil
@ -98,7 +162,11 @@ func (host *Host) RebuildCT(vmid uint) error {
return err return err
} }
host.Instances[vmid] = &instance // aquire lock on instance, release on return
instance.lock.Lock()
defer instance.lock.Unlock()
host.Instances[vmid] = instance
for volid := range instance.configDisks { for volid := range instance.configDisks {
instance.RebuildVolume(host, volid) instance.RebuildVolume(host, volid)
@ -114,12 +182,12 @@ func (host *Host) RebuildCT(vmid uint) error {
func (instance *Instance) RebuildVolume(host *Host, volid string) error { func (instance *Instance) RebuildVolume(host *Host, volid string) error {
volumeDataString := instance.configDisks[volid] volumeDataString := instance.configDisks[volid]
volume, _, _, err := GetVolumeInfo(*host, volumeDataString) volume, _, _, err := GetVolumeInfo(host, volumeDataString)
if err != nil { if err != nil {
return err return err
} }
instance.Volumes[volid] = &volume instance.Volumes[volid] = volume
return nil return nil
} }
@ -136,12 +204,12 @@ func (instance *Instance) RebuildNet(netid string) error {
return nil return nil
} }
instance.Nets[uint(idnum)] = &netinfo instance.Nets[uint(idnum)] = netinfo
return nil return nil
} }
func (instance *Instance) RebuildDevice(host Host, deviceid string) error { func (instance *Instance) RebuildDevice(host *Host, deviceid string) error {
instanceDevice, ok := instance.configHostPCIs[deviceid] instanceDevice, ok := instance.configHostPCIs[deviceid]
if !ok { // if device does not exist if !ok { // if device does not exist
return fmt.Errorf("%s not found in devices", deviceid) return fmt.Errorf("%s not found in devices", deviceid)

View File

@ -57,20 +57,20 @@ func (pve ProxmoxClient) Nodes() ([]string, error) {
} }
// Gets a Node's resources but does not recursively expand instances // Gets a Node's resources but does not recursively expand instances
func (pve ProxmoxClient) Node(nodeName string) (Host, error) { func (pve ProxmoxClient) Node(nodeName string) (*Host, error) {
host := Host{} host := Host{}
host.Devices = make(map[string]*Device) host.Devices = make(map[string]*Device)
host.Instances = make(map[uint]*Instance) host.Instances = make(map[uint]*Instance)
node, err := pve.client.Node(context.Background(), nodeName) node, err := pve.client.Node(context.Background(), nodeName)
if err != nil { if err != nil {
return host, err return &host, err
} }
devices := []Device{} devices := []Device{}
err = pve.client.Get(context.Background(), fmt.Sprintf("/nodes/%s/hardware/pci", nodeName), &devices) err = pve.client.Get(context.Background(), fmt.Sprintf("/nodes/%s/hardware/pci", nodeName), &devices)
if err != nil { if err != nil {
return host, err return &host, err
} }
for _, device := range devices { for _, device := range devices {
@ -81,14 +81,14 @@ func (pve ProxmoxClient) Node(nodeName string) (Host, error) {
host.Cores.Total = uint64(node.CPUInfo.CPUs) host.Cores.Total = uint64(node.CPUInfo.CPUs)
host.Memory.Total = uint64(node.Memory.Total) host.Memory.Total = uint64(node.Memory.Total)
host.Swap.Total = uint64(node.Swap.Total) host.Swap.Total = uint64(node.Swap.Total)
host.node = node host.pvenode = node
return host, err return &host, err
} }
// Get all VM IDs on specified host // Get all VM IDs on specified host
func (host Host) VirtualMachines() ([]uint, error) { func (host *Host) VirtualMachines() ([]uint, error) {
vms, err := host.node.VirtualMachines(context.Background()) vms, err := host.pvenode.VirtualMachines(context.Background())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -100,11 +100,11 @@ func (host Host) VirtualMachines() ([]uint, error) {
} }
// Get a VM's CPU, Memory but does not recursively link Devices, Disks, Drives, Nets // Get a VM's CPU, Memory but does not recursively link Devices, Disks, Drives, Nets
func (host Host) VirtualMachine(VMID uint) (Instance, error) { func (host *Host) VirtualMachine(VMID uint) (*Instance, error) {
instance := Instance{} instance := Instance{}
vm, err := host.node.VirtualMachine(context.Background(), int(VMID)) vm, err := host.pvenode.VirtualMachine(context.Background(), int(VMID))
if err != nil { if err != nil {
return instance, err return &instance, err
} }
config := vm.VirtualMachineConfig config := vm.VirtualMachineConfig
@ -112,7 +112,7 @@ func (host Host) VirtualMachine(VMID uint) (Instance, error) {
instance.configNets = config.MergeNets() instance.configNets = config.MergeNets()
instance.configDisks = MergeVMDisksAndUnused(config) instance.configDisks = MergeVMDisksAndUnused(config)
instance.config = config instance.pveconfig = config
instance.Type = VM instance.Type = VM
instance.Name = vm.Name instance.Name = vm.Name
@ -123,7 +123,7 @@ func (host Host) VirtualMachine(VMID uint) (Instance, error) {
instance.Nets = make(map[uint]*Net) instance.Nets = make(map[uint]*Net)
instance.Devices = make(map[uint][]*Device) instance.Devices = make(map[uint][]*Device)
return instance, nil return &instance, nil
} }
func MergeVMDisksAndUnused(vmc *proxmox.VirtualMachineConfig) map[string]string { func MergeVMDisksAndUnused(vmc *proxmox.VirtualMachineConfig) map[string]string {
@ -135,8 +135,8 @@ func MergeVMDisksAndUnused(vmc *proxmox.VirtualMachineConfig) map[string]string
} }
// Get all CT IDs on specified host // Get all CT IDs on specified host
func (host Host) Containers() ([]uint, error) { func (host *Host) Containers() ([]uint, error) {
cts, err := host.node.Containers(context.Background()) cts, err := host.pvenode.Containers(context.Background())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -148,11 +148,11 @@ func (host Host) Containers() ([]uint, error) {
} }
// Get a CT's CPU, Memory, Swap but does not recursively link Devices, Disks, Drives, Nets // Get a CT's CPU, Memory, Swap but does not recursively link Devices, Disks, Drives, Nets
func (host Host) Container(VMID uint) (Instance, error) { func (host *Host) Container(VMID uint) (*Instance, error) {
instance := Instance{} instance := Instance{}
ct, err := host.node.Container(context.Background(), int(VMID)) ct, err := host.pvenode.Container(context.Background(), int(VMID))
if err != nil { if err != nil {
return instance, err return &instance, err
} }
config := ct.ContainerConfig config := ct.ContainerConfig
@ -160,7 +160,7 @@ func (host Host) Container(VMID uint) (Instance, error) {
instance.configNets = config.MergeNets() instance.configNets = config.MergeNets()
instance.configDisks = MergeCTDisksAndUnused(config) instance.configDisks = MergeCTDisksAndUnused(config)
instance.config = config instance.pveconfig = config
instance.Type = CT instance.Type = CT
instance.Name = ct.Name instance.Name = ct.Name
@ -170,7 +170,7 @@ func (host Host) Container(VMID uint) (Instance, error) {
instance.Volumes = make(map[string]*Volume) instance.Volumes = make(map[string]*Volume)
instance.Nets = make(map[uint]*Net) instance.Nets = make(map[uint]*Net)
return instance, nil return &instance, nil
} }
func MergeCTDisksAndUnused(cc *proxmox.ContainerConfig) map[string]string { func MergeCTDisksAndUnused(cc *proxmox.ContainerConfig) map[string]string {
@ -186,19 +186,19 @@ func MergeCTDisksAndUnused(cc *proxmox.ContainerConfig) map[string]string {
} }
// get volume fornmat, size, volumeid, and storageid from instance volume data string (eg: local:100/vm-100-disk-0.raw ... ) // get volume fornmat, size, volumeid, and storageid from instance volume data string (eg: local:100/vm-100-disk-0.raw ... )
func GetVolumeInfo(host Host, volume string) (Volume, string, string, error) { func GetVolumeInfo(host *Host, volume string) (*Volume, string, string, error) {
volumeData := Volume{} volumeData := Volume{}
storageID := strings.Split(volume, ":")[0] storageID := strings.Split(volume, ":")[0]
volumeID := strings.Split(volume, ",")[0] volumeID := strings.Split(volume, ",")[0]
storage, err := host.node.Storage(context.Background(), storageID) storage, err := host.pvenode.Storage(context.Background(), storageID)
if err != nil { if err != nil {
return volumeData, volumeID, storageID, nil return &volumeData, volumeID, storageID, nil
} }
content, err := storage.GetContent(context.Background()) content, err := storage.GetContent(context.Background())
if err != nil { if err != nil {
return volumeData, volumeID, storageID, nil return &volumeData, volumeID, storageID, nil
} }
for _, c := range content { for _, c := range content {
@ -209,27 +209,27 @@ func GetVolumeInfo(host Host, volume string) (Volume, string, string, error) {
} }
} }
return volumeData, volumeID, storageID, nil return &volumeData, volumeID, storageID, nil
} }
func GetNetInfo(net string) (Net, error) { func GetNetInfo(net string) (*Net, error) {
n := Net{} n := Net{}
for _, val := range strings.Split(net, ",") { for _, val := range strings.Split(net, ",") {
if strings.HasPrefix(val, "rate=") { if strings.HasPrefix(val, "rate=") {
rate, err := strconv.ParseUint(strings.TrimPrefix(val, "rate="), 10, 64) rate, err := strconv.ParseUint(strings.TrimPrefix(val, "rate="), 10, 64)
if err != nil { if err != nil {
return n, err return &n, err
} }
n.Rate = rate n.Rate = rate
} else if strings.HasPrefix(val, "tag=") { } else if strings.HasPrefix(val, "tag=") {
vlan, err := strconv.ParseUint(strings.TrimPrefix(val, "tag="), 10, 64) vlan, err := strconv.ParseUint(strings.TrimPrefix(val, "tag="), 10, 64)
if err != nil { if err != nil {
return n, err return &n, err
} }
n.VLAN = vlan n.VLAN = vlan
} }
} }
return n, nil return &n, nil
} }

View File

@ -1,21 +1,26 @@
package app package app
import "github.com/luthermonson/go-proxmox" import (
"sync"
type Resource struct { "github.com/luthermonson/go-proxmox"
Reserved uint64 )
Free uint64
Total uint64 type Cluster struct {
lock sync.Mutex
pve ProxmoxClient
Hosts map[string]*Host
} }
type Host struct { type Host struct {
lock sync.Mutex
Name string Name string
Cores Resource Cores Resource
Memory Resource Memory Resource
Swap Resource Swap Resource
Devices map[string]*Device Devices map[string]*Device
Instances map[uint]*Instance Instances map[uint]*Instance
node *proxmox.Node pvenode *proxmox.Node
} }
type InstanceType string type InstanceType string
@ -26,6 +31,7 @@ const (
) )
type Instance struct { type Instance struct {
lock sync.Mutex
Type InstanceType Type InstanceType
Name string Name string
Proctype string Proctype string
@ -35,11 +41,16 @@ type Instance struct {
Volumes map[string]*Volume Volumes map[string]*Volume
Nets map[uint]*Net Nets map[uint]*Net
Devices map[uint][]*Device Devices map[uint][]*Device
config interface{} pveconfig interface{}
configDisks map[string]string configDisks map[string]string
configNets map[string]string configNets map[string]string
configHostPCIs map[string]string configHostPCIs map[string]string
proxmox.ContainerInterface }
type Resource struct {
Reserved uint64
Free uint64
Total uint64
} }
type Volume struct { type Volume struct {

View File

@ -19,6 +19,7 @@ type Config struct {
Secret string `json:"uuid"` Secret string `json:"uuid"`
} }
} }
ReloadInterval int `json:"reloadInterval"`
} }
func GetConfig(configPath string) Config { func GetConfig(configPath string) Config {