From f1deacee1044fd82acefddfcfb68056d5260227f Mon Sep 17 00:00:00 2001 From: Arthur Lu Date: Mon, 1 Jun 2026 21:17:51 +0000 Subject: [PATCH] improve sync latency significantly by using goroutines --- app/app.go | 7 +- app/model.go | 398 +++++++++++++++++++++++++-------------------------- go.mod | 1 + 3 files changed, 204 insertions(+), 202 deletions(-) diff --git a/app/app.go b/app/app.go index 7047b92..18b7873 100644 --- a/app/app.go +++ b/app/app.go @@ -39,7 +39,7 @@ func Run() { if err != nil { log.Printf("[Error] error encountered while syncing cluster: %s", err) } else { - log.Printf("[INFO] synced cluster in %fs\n", time.Since(start).Seconds()) + log.Printf("[INFO] synced cluster in %d ms\n", time.Since(start).Milliseconds()) } // set repeating update for full rebuilds @@ -56,10 +56,11 @@ func Run() { log.Printf("[INFO] starting cluster sync\n") err := cluster.Sync() if err != nil { - log.Printf("[Error] error encountered while syncing cluster: %s", err) + log.Printf("[ERR ] error encountered while syncing cluster: %s", err) } else { - log.Printf("[INFO] synced cluster in %fs\n", time.Since(start).Seconds()) + log.Printf("[INFO] synced cluster in %d ms", time.Since(start).Milliseconds()) } + } } }() diff --git a/app/model.go b/app/model.go index 47c4422..fce094d 100644 --- a/app/model.go +++ b/app/model.go @@ -5,8 +5,11 @@ import ( "fmt" "log" "strings" + "time" paas "proxmoxaas-common-lib" + + "golang.org/x/sync/errgroup" ) func (cluster *Cluster) Init(pve ProxmoxClient) { @@ -14,287 +17,284 @@ func (cluster *Cluster) Init(pve ProxmoxClient) { } func (cluster *Cluster) Get() (*Cluster, error) { - cluster_ch := make(chan *Cluster) - err_ch := make(chan error) + // aquire cluster lock + cluster.lock.Lock() + defer cluster.lock.Unlock() - go func() { - // aquire cluster lock - cluster.lock.Lock() - defer cluster.lock.Unlock() - cluster_ch <- cluster - err_ch <- nil - }() - - return <-cluster_ch, <-err_ch + return cluster, nil } // hard sync cluster func (cluster *Cluster) Sync() error { - err_ch := make(chan error) + // aquire lock on cluster, release on return + cluster.lock.Lock() - go func() { - // aquire lock on cluster, release on return - cluster.lock.Lock() + cluster.Nodes = make(map[string]*Node) - cluster.Nodes = make(map[string]*Node) + wg, _ := errgroup.WithContext(context.Background()) - // get all nodes - nodes, err := cluster.pve.Nodes() - if err != nil { - err_ch <- err - return - } - // for each node: - for _, hostName := range nodes { + // get all nodes + nodes, err := cluster.pve.Nodes() + if err != nil { + return err + } + + // for each node: + for _, hostName := range nodes { + wg.Go(func() error { + start := time.Now() // rebuild node err := cluster.RebuildNode(hostName) if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR ] %s", err) - } else { // otherwise log success - log.Printf("[INFO] successfully synced node %s", hostName) + log.Printf("[ERR ] error encountered while syncing node %s: %s", hostName, err) + } else { + log.Printf("[INFO] synced node %s in %d ms", hostName, time.Since(start).Milliseconds()) } - } + return err + }) + } - cluster.lock.Unlock() + err = wg.Wait() + if err != nil { + return err + } - err = cluster.ResolvePoolMembership() - if err != nil { - err_ch <- err - } + cluster.lock.Unlock() - err_ch <- nil - }() + err = cluster.ResolvePoolMembership() + if err != nil { + return err + } - return <-err_ch + return nil } func (cluster *Cluster) ResolvePoolMembership() error { - err_ch := make(chan error) + // aquire lock on cluster, release on return + cluster.lock.Lock() - go func() { - // aquire lock on cluster, release on return - cluster.lock.Lock() - - //resolve pool membership - pools, err := cluster.pve.client.Pools(context.Background()) + //resolve pool membership + pools, err := cluster.pve.client.Pools(context.Background()) + if err != nil { + return err + } + for _, pool := range pools { + pool, err = cluster.pve.client.Pool(context.Background(), pool.PoolID) if err != nil { - err_ch <- err + return err } - for _, pool := range pools { - pool, err = cluster.pve.client.Pool(context.Background(), pool.PoolID) - if err != nil { - err_ch <- err - } - for _, member := range pool.Members { - if member.Type == "lxc" || member.Type == "qemu" { - node, ok := cluster.Nodes[member.Node] - if !ok { - err_ch <- fmt.Errorf("Instance %d has no node", member.VMID) - } - instance, ok := node.Instances[InstanceID(member.VMID)] - if !ok { - err_ch <- fmt.Errorf("Instance %d claimed to be in node %s but was not", member.VMID, node.Name) - } - instance.Pool = pool.PoolID - log.Printf("[INFO] successfully resolved pool membership for vmid=%d pool=%s", member.VMID, pool.PoolID) + for _, member := range pool.Members { + if member.Type == "lxc" || member.Type == "qemu" { + node, ok := cluster.Nodes[member.Node] + if !ok { + return fmt.Errorf("Instance %d has no node", member.VMID) } + instance, ok := node.Instances[InstanceID(member.VMID)] + if !ok { + return fmt.Errorf("Instance %d claimed to be in node %s but was not", member.VMID, node.Name) + } + instance.Pool = pool.PoolID + log.Printf("[INFO] resolved pool membership for vmid=%d pool=%s", member.VMID, pool.PoolID) } } + } - err_ch <- nil - - cluster.lock.Unlock() - }() - - return <-err_ch + cluster.lock.Unlock() + return nil } // get a node in the cluster func (cluster *Cluster) GetNode(hostName string) (*Node, error) { - host_ch := make(chan *Node) - err_ch := make(chan error) + // aquire cluster lock + cluster.lock.Lock() + defer cluster.lock.Unlock() + // get host + host, ok := cluster.Nodes[hostName] + if !ok { + return nil, fmt.Errorf("%s not in cluster", hostName) + } else { + // aquire host lock to wait in case of a concurrent write + host.lock.Lock() + defer host.lock.Unlock() - go func() { - // aquire cluster lock - cluster.lock.Lock() - defer cluster.lock.Unlock() - // get host - host, ok := cluster.Nodes[hostName] - if !ok { - host_ch <- nil - err_ch <- fmt.Errorf("%s not in cluster", hostName) - } else { - // aquire host lock to wait in case of a concurrent write - host.lock.Lock() - defer host.lock.Unlock() - - host_ch <- host - err_ch <- nil - } - }() - - return <-host_ch, <-err_ch + return host, nil + } } // hard sync node // returns error if the node could not be reached func (cluster *Cluster) RebuildNode(hostName string) error { - err_ch := make(chan error) + host, err := cluster.pve.Node(hostName) + if err != nil && cluster.Nodes[hostName] == nil { // host is unreachable and did not exist previously + // return an error because we requested to sync a node that was not already in the cluster + return fmt.Errorf("error retrieving %s: %s", hostName, err.Error()) + } - go func() { - host, err := cluster.pve.Node(hostName) - if err != nil && cluster.Nodes[hostName] == nil { // host is unreachable and did not exist previously - // return an error because we requested to sync a node that was not already in the cluster - err_ch <- fmt.Errorf("error retrieving %s: %s", hostName, err.Error()) - } + // aquire lock on host, release on return + host.lock.Lock() + defer host.lock.Unlock() - // aquire lock on host, release on return - host.lock.Lock() - defer host.lock.Unlock() + wg, _ := errgroup.WithContext(context.Background()) - if err != nil && cluster.Nodes[hostName] != nil { // host is unreachable and did exist previously - // assume the node is down or gone and delete from cluster - delete(cluster.Nodes, hostName) - err_ch <- nil - } + if err != nil && cluster.Nodes[hostName] != nil { // host is unreachable and did exist previously + // assume the node is down or gone and delete from cluster + delete(cluster.Nodes, hostName) + return nil + } - cluster.Nodes[hostName] = host + cluster.Nodes[hostName] = host - // get node's VMs - vms, err := host.VirtualMachines() - if err != nil { - err_ch <- err + // get node's VMs + vms, err := host.VirtualMachines() + if err != nil { + return err - } - for _, vmid := range vms { + } + for _, vmid := range vms { + wg.Go(func() error { + start := time.Now() err := host.RebuildInstance(VM, vmid) if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR ] %s", err) + log.Printf("[ERR ] error encountered while syncing vm %s.%d: %s", hostName, vmid, err) } else { - log.Printf("[INFO] successfully synced vm %s.%d", hostName, vmid) + log.Printf("[INFO] synced vm %s.%d in %d ms", hostName, vmid, time.Since(start).Milliseconds()) } - } + return err + }) + } - // get node's CTs - cts, err := host.Containers() - if err != nil { - err_ch <- err - } - for _, vmid := range cts { + // get node's CTs + cts, err := host.Containers() + if err != nil { + return err + } + for _, vmid := range cts { + wg.Go(func() error { + start := time.Now() err := host.RebuildInstance(CT, vmid) if err != nil { // if an error was encountered, continue and log the error - log.Printf("[ERR ] %s", err) + log.Printf("[ERR ] error encountered while syncing ct %s.%d: %s", hostName, vmid, err) } else { - log.Printf("[INFO] successfully synced ct %s.%d", hostName, vmid) - } - } + log.Printf("[INFO] synced ct %s.%d in %d ms", hostName, vmid, time.Since(start).Milliseconds()) - // check node device reserved by iterating over each function, we will assume that a single reserved function means the device is also reserved - for _, device := range host.Devices { - reserved := false - for _, function := range device.Functions { - reserved = reserved || function.Reserved } - device.Reserved = reserved - } + return err + }) + } - err_ch <- nil - }() - return <-err_ch + err = wg.Wait() + if err != nil { + return err + } + + // check node device reserved by iterating over each function, we will assume that a single reserved function means the device is also reserved + for _, device := range host.Devices { + reserved := false + for _, function := range device.Functions { + reserved = reserved || function.Reserved + } + device.Reserved = reserved + } + + return nil } func (host *Node) GetInstance(vmid uint) (*Instance, error) { - instance_ch := make(chan *Instance) - err_ch := make(chan error) - go func() { - // aquire host lock - host.lock.Lock() - defer host.lock.Unlock() - // get instance - instance, ok := host.Instances[InstanceID(vmid)] - if !ok { - instance_ch <- nil - err_ch <- fmt.Errorf("vmid %d not in host %s", vmid, host.Name) - } else { - // aquire instance lock to wait in case of a concurrent write - instance.lock.Lock() - defer instance.lock.Unlock() + // aquire host lock + host.lock.Lock() + defer host.lock.Unlock() + // get instance + instance, ok := host.Instances[InstanceID(vmid)] + if !ok { + return nil, fmt.Errorf("vmid %d not in host %s", vmid, host.Name) + } else { + // aquire instance lock to wait in case of a concurrent write + instance.lock.Lock() + defer instance.lock.Unlock() - instance_ch <- instance - err_ch <- nil - } - }() - - return <-instance_ch, <-err_ch + return instance, nil + } } // hard sync instance // returns error if the instance could not be reached func (host *Node) RebuildInstance(instancetype InstanceType, vmid uint) error { - err_ch := make(chan error) + instanceID := InstanceID(vmid) + var instance *Instance + var err error + switch instancetype { + case VM: + instance, err = host.VirtualMachine(vmid) + case CT: + instance, err = host.Container(vmid) - go func() { - instanceID := InstanceID(vmid) - var instance *Instance - var err error - switch instancetype { - case VM: - instance, err = host.VirtualMachine(vmid) - case CT: - instance, err = host.Container(vmid) + } - } + if err != nil && host.Instances[instanceID] == nil { // instance is unreachable and did not exist previously + // return an error because we requested to sync an instance that was not already in the cluster + return fmt.Errorf("error retrieving %s.%d: %s", host.Name, instanceID, err.Error()) + } - if err != nil && host.Instances[instanceID] == nil { // instance is unreachable and did not exist previously - // return an error because we requested to sync an instance that was not already in the cluster - err_ch <- fmt.Errorf("error retrieving %s.%d: %s", host.Name, instanceID, err.Error()) - } + // aquire lock on instance, release on return + instance.lock.Lock() + defer instance.lock.Unlock() - // aquire lock on instance, release on return - instance.lock.Lock() - defer instance.lock.Unlock() + wg, _ := errgroup.WithContext(context.Background()) - if err != nil && host.Instances[instanceID] != nil { // host is unreachable and did exist previously - // assume the instance is gone and delete from cluster - delete(host.Instances, instanceID) - err_ch <- nil - } + if err != nil && host.Instances[instanceID] != nil { // host is unreachable and did exist previously + // assume the instance is gone and delete from cluster + delete(host.Instances, instanceID) + return nil + } - host.Instances[instanceID] = instance + host.Instances[instanceID] = instance - for volid := range instance.configDisks { + for volid := range instance.configDisks { + wg.Go(func() error { err = instance.RebuildVolume(host, volid) if err != nil { - err_ch <- err + log.Printf("[ERR ] error rebuilding volume %s: %s", volid, err) } - } + return err + }) + } - for netid := range instance.configNets { + for netid := range instance.configNets { + wg.Go(func() error { err = instance.RebuildNet(host, netid) if err != nil { - err_ch <- err + log.Printf("[ERR ] error rebuilding net %s: %s", netid, err) + return err } - } + return err + }) + } - for deviceid := range instance.configHostPCIs { + for deviceid := range instance.configHostPCIs { + wg.Go(func() error { err = instance.RebuildDevice(host, deviceid) if err != nil { - err_ch <- err + log.Printf("[ERR ] error rebuilding pci %s: %s", deviceid, err) } + return err + }) + } + + err = wg.Wait() + if err != nil { + return err + } + + if instance.Type == VM { + err = instance.RebuildBoot(host) + if err != nil { + log.Printf("[ERR ] error rebuilding boot: %s", err) } - - if instance.Type == VM { - err = instance.RebuildBoot(host) - if err != nil { - err_ch <- err - } - } - - err_ch <- nil - }() - - return <-err_ch + return err + } else { + return nil + } } func (instance *Instance) RebuildVolume(host *Node, volid string) error { diff --git a/go.mod b/go.mod index 440a7aa..b520337 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.26.2 require ( github.com/gin-gonic/gin v1.12.0 github.com/luthermonson/go-proxmox v0.7.0 + golang.org/x/sync v0.20.0 proxmoxaas-common-lib v0.0.0 )