首页 > 解决方案 > 如何创建多个工作人员以从工作队列中获取密钥并在它们上处理一些业务逻辑?

问题描述

我是 Golang 和 Kubernetes 的新手。我尝试使用 client-go 库在 golang 中创建自定义控制器。控制器与 K8s Api 服务器连接,将 Pod 的详细信息放入缓存中,并将其发送到我对 Pod 执行一些操作的工作队列。但是我希望这个过程很快,为此我需要创建多个工人。如何创建多个可以作用于同一个工作队列并提高代码速度的工作人员?

以下是我的控制器的示例:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "github.com/golang/glog"

    "k8s.io/apimachinery/pkg/watch"

    v1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

    rs "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)

type Controller struct {
    clientset kubernetes.Interface
    queue     workqueue.RateLimitingInterface
    informer  cache.SharedIndexInformer
}

var (
    //used the config file
    kubeconfig = flag.String("kubeconfig", "location", "absolute path to the kubeconfig file")
)

// Creating the SharedIndexInformer to bring the details into  the cache
func CreateSharedIndexInformer() {
    flag.Parse()
    //creating config using the kubeconfig file
    configuration, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err.Error())
        fmt.Println("Unable to find the file")
    }

    cs, err := kubernetes.NewForConfig(configuration)

    //Creating the queue
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    pods, err := cs.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
    
   //Creating the SharedIndexInformer 
    informer := cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (rs.Object, error) {
                return cs.CoreV1().Pods("").List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                return cs.CoreV1().Pods("").Watch(context.TODO(), options)
            },
        },
        &v1.Pod{},
        time.Second*10, //Skip resync
        cache.Indexers{},
    )
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
                if err == nil {
                    queue.Add(key)
                }
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    })

    controller := &Controller{
        clientset: cs,
        queue:     queue,
        informer:  informer,
    }
    stop := make(chan struct{})
    go controller.Run(stop)

    // Wait forever
    select {}
}


func (c *Controller) Run(stopCh chan struct{}) {

    // don't let panics crash the process
    defer runtime.HandleCrash()
    // make sure the work queue is shutdown which will trigger workers to end
    defer c.queue.ShutDown()

    //c.logger.Info("Starting kubewatch controller")

    go c.informer.Run(stopCh)

    // wait for the caches to synchronize before starting the worker
    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    //c.logger.Info("Kubewatch controller synced and ready")

    // runWorker will loop until "something bad" happens.  The .Until will
    // then rekick the worker after one second
    go wait.Until(c.runWorker, time.Second, stopCh)
    <-stopCh
}

func (c *Controller) runWorker() {
    // processNextWorkItem will automatically wait until there's work available
    for c.processNextItem() {
        // continue looping
    }
}

// processNextWorkItem deals with one key off the queue.  It returns false
// when it's time to quit.

func (c *Controller) processNextItem() bool {
    // pull the next work item from queue.  It should be a key we use to lookup
    // something in a cache
    key, quit := c.queue.Get()
    if quit {
        return false
    }

    // you always have to indicate to the queue that you've completed a piece of
    // work
    defer c.queue.Done(key)
    var obj string
    var ok bool
    if obj, ok = key.(string); !ok {
        // As the item in the workqueue is actually invalid, we call
        // Forget here else we'd go into a loop of attempting to
        // process a work item that is invalid.
        c.queue.Forget(key)
        runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))

    }

    // do your work on the key.
    err := c.processBusinessLogic(key.(string))

    if err == nil {
        // No error, tell the queue to stop tracking history
        c.queue.Forget(key)
    } else if c.queue.NumRequeues(key) < 10 {
        //c.logger.Errorf("Error processing %s (will retry): %v", key, err)
        // requeue the item to work on later
        c.queue.AddRateLimited(key)
    } else {
        // err != nil and too many retries
        //c.logger.Errorf("Error processing %s (giving up): %v", key, err)
        c.queue.Forget(key)
        runtime.HandleError(err)
    }

    return true
}

func (c *Controller) processBusinessLogic(key string) error {
    obj, exists, err := c.informer.GetIndexer().GetByKey(key)
    if err != nil {
        glog.Errorf("Fetching object with key %s from store failed with %v", key, err)
        return err
    }   

    if !exists {
        // Below we will warm up our cache with a Pod, so that we will see a delete for one pod
        fmt.Printf("Pod %s does not exist anymore\n", key)
    } else {
              //Perform some business logic over the pods or Deployment

        // Note that you also have to check the uid if you have a local controlled resource, which
        // is dependent on the actual instance, to detect that a Pod was recreated with the same name
        fmt.Printf("Add event for Pod %s\n", obj.(*v1.Pod).GetName())
    }
        }

    }

    return nil
}

func (c *Controller) handleErr(err error, key interface{}) {
    glog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

func main() {
    CreateSharedIndexInformer()
}

标签: gokubernetescontrollerclient-go

解决方案


您可以在Run函数中添加更多工作人员,如下所示:

func (c *Controller) Run(stopCh chan struct{}) {
    ...
    // runWorker will loop until "something bad" happens.  The .Until will
    // then rekick the worker after one second
    for i := 0; i < 5; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    <-stopCh
}

推荐阅读