首页 > 解决方案 > 消费者在几个小时后停止阅读消息

问题描述

我正在尝试使用服务总线(https://github.com/Azure/azure-service-bus-go)订阅用户。

当我启动应用程序时,它运行良好。但是 2/3 小时后,消费者就停止从任何队列中读取任何消息。

起初我认为这是上下文的一些问题,我最终在服务总线实现中使用了 context.Background()。我不知道它是否正确。

Consumer的主要功能:

import (
    "fmt"
    "log"
    "time"

    "google.golang.org/protobuf/proto"
    pb "servicebus_go_test.com/autorizacao_pb" //Protobufs
    function "servicebus_go_test.com/function"
)

func main() {
    servicebus_endPoint := "*******"
    servicebus_accessKeyName := "application"
    servicebus_accessKey := "******"

    sb, err := function.NewServiceBus()

    sb.SetConnectionString(servicebus_endPoint, servicebus_accessKeyName, servicebus_accessKey)

    sb.Connect()

    var queueName_papelRecurso = "FilaDeTeste_PAPELRECURSO"
    var queueName_papelUsuario = "FilaDeTeste_PAPELUSUARIO"
    var queueName_recursoAcao = "FilaDeTeste_RECURSOACAO"

    var consumerCount = 1
    var maxDeliveryCount = 5

    err = sb.Subscribe(queueName_papelRecurso, consumerCount, maxDeliveryCount, func(data []byte) error {
        papelRecurso := &pb.CriarPapelRecurso{}

        err := proto.Unmarshal(data, papelRecurso)
        if err != nil {
            fmt.Println("Failed to parse proto", err)
            return err
        }

        fmt.Println(papelRecurso)

        return nil
    })

    err = sb.Subscribe(queueName_papelUsuario, consumerCount, maxDeliveryCount, func(data []byte) error {
        papelUsuario := &pb.CriarPapelUsuario{}

        err := proto.Unmarshal(data, papelUsuario)
        if err != nil {
            fmt.Println("Failed to parse proto", err)
            return err
        }

        fmt.Println(papelUsuario)

        return nil
    })

    err = sb.Subscribe(queueName_recursoAcao, consumerCount, maxDeliveryCount, func(data []byte) error {

        recursoAcao := &pb.CriarRecursoAcao{}

        err := proto.Unmarshal(data, recursoAcao)
        if err != nil {
            fmt.Println("Failed to parse proto", err)
            return err
        }

        fmt.Println(recursoAcao)

        return nil
    })

    if err != nil {
        fmt.Println("[Main][panic] ERROR: ", err)
        panic(err)
    }
    time.Sleep(10 * time.Hour)
}

这是服务总线的实现:

package function

import (
    "context"
    "errors"
    "fmt"

    servicebus "github.com/Azure/azure-service-bus-go"
)

type ServiceBus interface {
    SetConnectionString(endPoint string, accessKeyName string, accessKey string) error
    Connect() error
    Subscribe(queueName string, consumerCount int, maxDeliveryCount int, consumer func([]byte) error) error
    Publisher(queueName string, data []byte) error
}

type serviceBusStruct struct {
    connectionString string
    ctx              context.Context
    ns               *servicebus.Namespace
    qm               *servicebus.QueueManager
}

func NewServiceBus() (ServiceBus, error) {

    ctx := context.Background()

    return &serviceBusStruct{
        connectionString: "",
        ctx:              ctx,
        ns:               nil,
    }, nil
}

func (s *serviceBusStruct) SetConnectionString(endPoint string, accessKeyName string, accessKey string) error {

    if endPoint == "" || accessKeyName == "" || accessKey == "" {
        return errors.New("[GetConnectionString] Erro - Required field not found, cant create connectionString")
    }

    s.connectionString = "Endpoint=" + endPoint + ";SharedAccessKeyName=" + accessKeyName + ";SharedAccessKey=" + accessKey

    return nil
}

func (s *serviceBusStruct) Connect() error {

    if s.ns == nil {
        ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(s.connectionString))
        if err != nil {
            return err
        }

        s.ns = ns
    }

    return nil
}

func (s *serviceBusStruct) Publisher(queueName string, data []byte) error {

    fmt.Println("[Publisher] data - bytes: ", data)

    queue, err := s.getQueue(queueName)
    if queue == nil || err != nil {
        fmt.Println("[Publisher] ERROR - to get queue: ", err)
        return err
    }

    err = queue.Send(s.ctx, servicebus.NewMessage(data))
    if err != nil {
        fmt.Println("[Publisher] ERROR - Send: ", err)
        return err
    }

    fmt.Println("[Publisher] Mensagem Publicada")

    return nil
}

func (s *serviceBusStruct) Subscribe(queueName string, consumerCount int, maxDeliveryCount int, consumer func([]byte) error) error {

    handler := func(ctx context.Context, msg *servicebus.Message) error {

        err := consumer(msg.Data)
        if err != nil {
            fmt.Println("[Subscribe][handler] ERROR - consumer: ", err)
            return msg.Abandon(ctx)
        } else {
            return msg.Complete(ctx)
        }
    }

    queue, err := s.getQueue(queueName, servicebus.QueueEntityWithMaxDeliveryCount(int32(maxDeliveryCount)))
    if queue == nil || err != nil {
        fmt.Println("[Publisher] ERROR - to get queue: ", err)
        return err
    }

    for i := 0; i < consumerCount; i++ {
        go func() {
            for {
                err = queue.Receive(s.ctx, servicebus.HandlerFunc(handler))
                if err != nil {
                    fmt.Println("[Subscribe] ERROR - Receive: ", err)
                    return
                }
            }
        }()
    }

    return nil
}

func (s *serviceBusStruct) getQueueEntity(queueName string, opts ...servicebus.QueueManagementOption) (*servicebus.QueueEntity, error) {

    if s.qm == nil {
        s.qm = s.ns.NewQueueManager()
    }

    qe, err := s.qm.Get(s.ctx, queueName)

    if qe == nil || err != nil {
        if opts != nil {
            qe, err = s.qm.Put(s.ctx, queueName, opts...)
            if err != nil {
                fmt.Println("[getQueueEntity] ERROR - Put: ", err)
                return nil, err
            }
        } else {
            qe, err = s.qm.Put(s.ctx, queueName)
            if err != nil {
                fmt.Println("[getQueueEntity] ERROR - Put: ", err)
                return nil, err
            }
        }
    }

    return qe, nil
}

func (s *serviceBusStruct) getQueue(queueName string, opts ...servicebus.QueueManagementOption) (*servicebus.Queue, error) {

    qe, err := s.getQueueEntity(queueName, opts...)
    if err != nil {
        fmt.Println("[getQueue] ERROR - getQueueEntity: ", err)
        return nil, err
    }

    queue, err := s.ns.NewQueue(qe.Name)
    if err != nil {
        fmt.Println("[getQueue] ERROR - NewQueue: ", err)
        return nil, err
    }

    return queue, nil
}


Publisher的主要功能:

import (
    "fmt"
    "log"
    "time"

    "google.golang.org/protobuf/proto"
    pb "servicebus_go_test.com/autorizacao_pb" //Protobufs
    function "servicebus_go_test.com/function"
)

func main() {

    servicebus_endPoint := "********"
    servicebus_accessKeyName := "application"
    servicebus_accessKey := "*******"

    sb, err := function.NewServiceBus()

    sb.SetConnectionString(servicebus_endPoint, servicebus_accessKeyName, servicebus_accessKey)

    sb.Connect()

    var queueName_papelRecurso = "FilaDeTeste_PAPELRECURSO"
    var queueName_papelUsuario = "FilaDeTeste_PAPELUSUARIO"
    var queueName_recursoAcao = "FilaDeTeste_RECURSOACAO"

    fmt.Println("-----------------------------------------------------------")

    papelRecurso := &pb.CriarPapelRecurso{
        DataCriacao: time.Now().String(),
        Dominio:     "domain",
        Papel:       "user-role",
        Recurso:     "auto",
        Acao:        "GET_/",
    }

    papelRecursoProto, err := proto.Marshal(papelRecurso)
    if err != nil {
        log.Fatalln("Failed to encode address book:", err)
    }

    sb.Publisher(queueName_papelRecurso, papelRecursoProto)

    fmt.Println("-----------------------------------------------------------")

    papelUsuario := &pb.CriarPapelUsuario{
        DataCriacao: time.Now().String(),
        Dominio:     "domain",
        Papel:       "user-role",
        Usuario:     "vitor",
        Hierarquia:  "",
    }

    papelUsuarioProto, err := proto.Marshal(papelUsuario)
    if err != nil {
        log.Fatalln("Failed to encode address book:", err)
    }
    sb.Publisher(queueName_papelUsuario, papelUsuarioProto)

    fmt.Println("-----------------------------------------------------------")

    recursoAcao := &pb.CriarRecursoAcao{
        DataCriacao: time.Now().String(),
        Resource:    "auto",
        Version:     "v1",
        Endpoints: []*pb.EndpointsDtos{
            {HttpMethod: "GET", Route: "/"},
        },
    }

    recursoAcaoProto, err := proto.Marshal(recursoAcao)
    if err != nil {
        log.Fatalln("Failed to encode address book:", err)
    }
    sb.Publisher(queueName_recursoAcao, recursoAcaoProto)

    fmt.Println("-----------------------------------------------------------")

    if err != nil {
        fmt.Println("[Main][panic] ERROR: ", err)
        panic(err)
    }

    time.Sleep(10 * time.Hour)
}

标签: azuregoazureservicebus

解决方案


推荐阅读