首页 > 解决方案 > 去继承任务处理

问题描述

我正在编写一个 Go 从属程序,它在 AQMP 消息队列上侦听任务。这些任务包含输入数据和输出数据,json 编码。不同的任务可以有完全不同的输入/输出结构。

我正在尝试找到一种方法来编写我的 registerTask 函数,以便我可以将该函数重新用于多个任务并将 JSON 编码为所需的 Input 结构。但是,如果输入与特定任务输入不匹配,JSON 解析器应该会抛出错误。我尝试接受 interface{},但这允许任何东西并使“foo”针对任何自定义结构进行验证。想法?

这整个作文对我来说是新的。在 PHP 中,我会用一个抽象基类来解决这个问题。

到目前为止,我注册新任务的代码:

func sendResult(chann *amqp.Channel, task tasks.TaskImpl, output tasks.TaskOutput) error {
    log.Printf("Sending response for %s: %v", task.GetName(), output)

    jsonResponse, err := json.Marshal(output)
    if err != nil {
        log.Printf("Could not encode task %s to JSON: %v", task.GetName(), output)
        return err
    }

    msg := amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        Timestamp:    time.Now(),
        ContentType:  "text/plain",
        Body:         jsonResponse,
    }

    return chann.Publish(QUEUE_RESULT, task.GetName(), false, false, msg)
}

func registerTask(chann *amqp.Channel, task tasks.TaskImpl) {
    log.Println("Registering task: " + task.GetName())

    deliverChann, err := chann.Consume(QUEUE_TODO, task.GetName(), false, false, false, false, nil)
    if err != nil {
        log.Fatalf("Could not consume: %v", err)
    }

    go func() {
        for taskMsg := range deliverChann {
            log.Printf("Task received: %v", taskMsg)

            // check if it's in UTF-8
            if strings.ToLower(taskMsg.ContentEncoding) != "utf-8" {
                log.Printf("Warning: task %s is not encoded in utf-8 but: %s", task.GetName(), taskMsg.ContentEncoding)
                taskMsg.Reject(false)
            }

            // check if it's JSON
            if strings.ToLower(taskMsg.ContentType) != "application/json" {
                log.Printf("Warning: task %s is not json but: %s", task.GetName(), taskMsg.ContentType)
                taskMsg.Reject(false)
            }

            log.Printf("Decoding %v", taskMsg.Body)
            if err := json.Unmarshal(taskMsg.Body, &task; err != nil {
                log.Printf("Could not decode task %s body: %v", task.GetName(), err)
                taskMsg.Reject(false)
                continue
            }

            log.Printf("Executing %s with args %v (from %v)", task.GetName(), task, taskMsg.Body)
            output, err := task.Process(input)
            if err != nil {
                log.Printf("Task '%s' error: %s", task.GetName(), err)
                taskMsg.Reject(true)
                continue
            }

            if err := sendResult(chann, task, output); err != nil {
                log.Printf("Could not send result for task: " + task.GetName())
                taskMsg.Reject(true)
                continue
            }

            if err := taskMsg.Ack(false); err != nil {
                log.Println("Could not acknowledge: " + err.Error())
            }
        }
    }()
}

我的任务代码:

type TaskInput struct{}
type TaskOutput struct{}

type TaskData interface {
    Input TaskInput `json:"input"`
    Output TaskOutput `json:"output"`
}

type TaskImpl interface {
    GetName() string
    Process(msg *TaskData) (*TaskData, error)
}


type SampleTaskInput struct {
    TaskInput
    Arg string
}
type SampleTaskOutput struct {
    TaskOutput
    Result string
}

type SampleTask struct {
    TaskImpl
}

func (SampleTask) GetName() string {
    return "sample"
}

func (SampleTask) Process(msg *TaskData) (*TaskData, error){

    log.Printf("Executing sampletask with arg: %v", msg.Input)

    msg.Output = SampleTaskOutput{}

    return SampleTaskOutput{
        Result: "String was " + data.Arg,
    }, nil
}

标签: gocomposition

解决方案


推荐阅读