首页 > 解决方案 > 调度员不解雇

问题描述

我正在尝试基于http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/实现调度员/工作人员方法

我试图构建的概念是我从队列 (AWS SQS) 中检索消息,然后下载文件 (AWS S3) 并对其执行处理。

然而,调度程序/工作人员永远不会被触发,正如您从控制台日志中看到的那样,“提交”是发布的最后一条消息:

2019/01/06 22:16:35 main start
2019/01/06 22:16:35 Worker queue dispatcher started...
2019/01/06 22:16:36 SQS
2019/01/06 22:16:36 {
  Messages: [{
<REDACTED>
    }]
}
2019/01/06 22:16:36 submitting
2019/01/06 22:16:36 {<REDACTED>}
2019/01/06 22:16:36 submitted

我的main()样子是这样的:

package main
import "flag"
import "log"
import "foobar/lib"
func main() {
    var config = flag.String("config", "/etc/foo/foobar/gsg.conf", "Config file location")
    flag.Parse()
    var conf,err = foobar.LoadConfiguration(*config)
    if err != nil {
        log.Fatal(err)
    }
    foobar.NewHandler(conf)
    var sts,stsErr = foobar.GetSQSMessage(conf)
    if stsErr != nil {
        log.Fatal(stsErr)
    }
    log.Print("SQS")
    log.Print(sts)
    foobar.SubmitPayload(sts)
}

我有一个handler.go如下:

package foobar
import (
    "log"
)
import "github.com/aws/aws-sdk-go/service/sqs"
func NewHandler(appConfig *FoobarConfigStruct) error {
    JobQueue = make(chan Job, appConfig.MaxQueues)
    log.Println("main start")
    var dispatcher = NewDispatcher(appConfig.MaxWorkers)
    dispatcher.Run()
    return nil
}

func SubmitPayload(sqsOutput *sqs.ReceiveMessageOutput) error {
    var x = &sqs.Message{}
    var y = Payload{*x}
    var work = Job{Payload: y}
    JobQueue <- work
    for _,payload := range sqsOutput.Messages {
        log.Println("submitting")
        log.Println(*payload.Body)
        var item = Payload{*payload}
        var work = Job{Payload:item}
        JobQueue <- work
        log.Println("submitted")
    }
    return nil
}

dispatcher.go如下:

package foobar
import (
        "log"
)

type Dispatcher struct {
        maxWorkers int
        WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
        pool := make(chan chan Job, maxWorkers)
        return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers}
}

func (d *Dispatcher) Run() {
        for i := 0; i < d.maxWorkers; i++ {
                worker := NewWorker(d.WorkerPool)
                worker.Start()
        }

        go d.dispatch()
}

func (d *Dispatcher) dispatch() {
        log.Printf("Worker queue dispatcher started...")
        for {

                select {
                case job := <-JobQueue:
                        log.Printf("Dispatcher request received")
                        go func(job Job) {
                                jobChannel := <-d.WorkerPool
                                jobChannel <- job
                        }(job)
                }
        }
}

worker.go如下:

package foobar                                                                                                                    

import (                                                                                                                           
    "log"                                                                                                                          
)                                                                                                                                  

import "github.com/aws/aws-sdk-go/service/sqs"                                                                                     

type Payload struct {                                                                                                              
    sqs sqs.Message                                                                                                                
}                                                                                                                                  

type Job struct {                                                                                                                  
    Payload Payload                                                                                                                
}                                                                                                                                  

var JobQueue chan Job                                                                                                              

type Worker struct {                                                                                                               
    WorkerPool chan chan Job                                                                                                       
    JobChannel chan Job                                                                                                            
    quit       chan bool                                                                                                           
}                                                                                                                                  

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}
func (w Worker) Start() {
    go func() {
        for {
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                log.Printf("starting worker...");
                if err := job.Payload.ProcessPayload(); err != nil {
                    log.Printf("Error processing: %s", err.Error())
                }

            case <-w.quit:
                return
            }
        }
    }()
}

func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

func (p *Payload) ProcessPayload() error {
    log.Printf("Processing payload")
    log.Printf(*p.sqs.Body)
    return nil
}

标签: go

解决方案


推荐阅读