azure - 消费者在几个小时后停止阅读消息
问题描述
我正在尝试使用服务总线(https://github.com/Azure/azure-service-bus-go)订阅用户。
当我启动应用程序时,它运行良好。但是 2/3 小时后,消费者就停止从任何队列中读取任何消息。
起初我认为这是上下文的一些问题,我最终在服务总线实现中使用了 context.Background()。我不知道它是否正确。
Consumer的主要功能:
import (
"fmt"
"log"
"time"
"google.golang.org/protobuf/proto"
pb "servicebus_go_test.com/autorizacao_pb" //Protobufs
function "servicebus_go_test.com/function"
)
func main() {
servicebus_endPoint := "*******"
servicebus_accessKeyName := "application"
servicebus_accessKey := "******"
sb, err := function.NewServiceBus()
sb.SetConnectionString(servicebus_endPoint, servicebus_accessKeyName, servicebus_accessKey)
sb.Connect()
var queueName_papelRecurso = "FilaDeTeste_PAPELRECURSO"
var queueName_papelUsuario = "FilaDeTeste_PAPELUSUARIO"
var queueName_recursoAcao = "FilaDeTeste_RECURSOACAO"
var consumerCount = 1
var maxDeliveryCount = 5
err = sb.Subscribe(queueName_papelRecurso, consumerCount, maxDeliveryCount, func(data []byte) error {
papelRecurso := &pb.CriarPapelRecurso{}
err := proto.Unmarshal(data, papelRecurso)
if err != nil {
fmt.Println("Failed to parse proto", err)
return err
}
fmt.Println(papelRecurso)
return nil
})
err = sb.Subscribe(queueName_papelUsuario, consumerCount, maxDeliveryCount, func(data []byte) error {
papelUsuario := &pb.CriarPapelUsuario{}
err := proto.Unmarshal(data, papelUsuario)
if err != nil {
fmt.Println("Failed to parse proto", err)
return err
}
fmt.Println(papelUsuario)
return nil
})
err = sb.Subscribe(queueName_recursoAcao, consumerCount, maxDeliveryCount, func(data []byte) error {
recursoAcao := &pb.CriarRecursoAcao{}
err := proto.Unmarshal(data, recursoAcao)
if err != nil {
fmt.Println("Failed to parse proto", err)
return err
}
fmt.Println(recursoAcao)
return nil
})
if err != nil {
fmt.Println("[Main][panic] ERROR: ", err)
panic(err)
}
time.Sleep(10 * time.Hour)
}
这是服务总线的实现:
package function
import (
"context"
"errors"
"fmt"
servicebus "github.com/Azure/azure-service-bus-go"
)
type ServiceBus interface {
SetConnectionString(endPoint string, accessKeyName string, accessKey string) error
Connect() error
Subscribe(queueName string, consumerCount int, maxDeliveryCount int, consumer func([]byte) error) error
Publisher(queueName string, data []byte) error
}
type serviceBusStruct struct {
connectionString string
ctx context.Context
ns *servicebus.Namespace
qm *servicebus.QueueManager
}
func NewServiceBus() (ServiceBus, error) {
ctx := context.Background()
return &serviceBusStruct{
connectionString: "",
ctx: ctx,
ns: nil,
}, nil
}
func (s *serviceBusStruct) SetConnectionString(endPoint string, accessKeyName string, accessKey string) error {
if endPoint == "" || accessKeyName == "" || accessKey == "" {
return errors.New("[GetConnectionString] Erro - Required field not found, cant create connectionString")
}
s.connectionString = "Endpoint=" + endPoint + ";SharedAccessKeyName=" + accessKeyName + ";SharedAccessKey=" + accessKey
return nil
}
func (s *serviceBusStruct) Connect() error {
if s.ns == nil {
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(s.connectionString))
if err != nil {
return err
}
s.ns = ns
}
return nil
}
func (s *serviceBusStruct) Publisher(queueName string, data []byte) error {
fmt.Println("[Publisher] data - bytes: ", data)
queue, err := s.getQueue(queueName)
if queue == nil || err != nil {
fmt.Println("[Publisher] ERROR - to get queue: ", err)
return err
}
err = queue.Send(s.ctx, servicebus.NewMessage(data))
if err != nil {
fmt.Println("[Publisher] ERROR - Send: ", err)
return err
}
fmt.Println("[Publisher] Mensagem Publicada")
return nil
}
func (s *serviceBusStruct) Subscribe(queueName string, consumerCount int, maxDeliveryCount int, consumer func([]byte) error) error {
handler := func(ctx context.Context, msg *servicebus.Message) error {
err := consumer(msg.Data)
if err != nil {
fmt.Println("[Subscribe][handler] ERROR - consumer: ", err)
return msg.Abandon(ctx)
} else {
return msg.Complete(ctx)
}
}
queue, err := s.getQueue(queueName, servicebus.QueueEntityWithMaxDeliveryCount(int32(maxDeliveryCount)))
if queue == nil || err != nil {
fmt.Println("[Publisher] ERROR - to get queue: ", err)
return err
}
for i := 0; i < consumerCount; i++ {
go func() {
for {
err = queue.Receive(s.ctx, servicebus.HandlerFunc(handler))
if err != nil {
fmt.Println("[Subscribe] ERROR - Receive: ", err)
return
}
}
}()
}
return nil
}
func (s *serviceBusStruct) getQueueEntity(queueName string, opts ...servicebus.QueueManagementOption) (*servicebus.QueueEntity, error) {
if s.qm == nil {
s.qm = s.ns.NewQueueManager()
}
qe, err := s.qm.Get(s.ctx, queueName)
if qe == nil || err != nil {
if opts != nil {
qe, err = s.qm.Put(s.ctx, queueName, opts...)
if err != nil {
fmt.Println("[getQueueEntity] ERROR - Put: ", err)
return nil, err
}
} else {
qe, err = s.qm.Put(s.ctx, queueName)
if err != nil {
fmt.Println("[getQueueEntity] ERROR - Put: ", err)
return nil, err
}
}
}
return qe, nil
}
func (s *serviceBusStruct) getQueue(queueName string, opts ...servicebus.QueueManagementOption) (*servicebus.Queue, error) {
qe, err := s.getQueueEntity(queueName, opts...)
if err != nil {
fmt.Println("[getQueue] ERROR - getQueueEntity: ", err)
return nil, err
}
queue, err := s.ns.NewQueue(qe.Name)
if err != nil {
fmt.Println("[getQueue] ERROR - NewQueue: ", err)
return nil, err
}
return queue, nil
}
Publisher的主要功能:
import (
"fmt"
"log"
"time"
"google.golang.org/protobuf/proto"
pb "servicebus_go_test.com/autorizacao_pb" //Protobufs
function "servicebus_go_test.com/function"
)
func main() {
servicebus_endPoint := "********"
servicebus_accessKeyName := "application"
servicebus_accessKey := "*******"
sb, err := function.NewServiceBus()
sb.SetConnectionString(servicebus_endPoint, servicebus_accessKeyName, servicebus_accessKey)
sb.Connect()
var queueName_papelRecurso = "FilaDeTeste_PAPELRECURSO"
var queueName_papelUsuario = "FilaDeTeste_PAPELUSUARIO"
var queueName_recursoAcao = "FilaDeTeste_RECURSOACAO"
fmt.Println("-----------------------------------------------------------")
papelRecurso := &pb.CriarPapelRecurso{
DataCriacao: time.Now().String(),
Dominio: "domain",
Papel: "user-role",
Recurso: "auto",
Acao: "GET_/",
}
papelRecursoProto, err := proto.Marshal(papelRecurso)
if err != nil {
log.Fatalln("Failed to encode address book:", err)
}
sb.Publisher(queueName_papelRecurso, papelRecursoProto)
fmt.Println("-----------------------------------------------------------")
papelUsuario := &pb.CriarPapelUsuario{
DataCriacao: time.Now().String(),
Dominio: "domain",
Papel: "user-role",
Usuario: "vitor",
Hierarquia: "",
}
papelUsuarioProto, err := proto.Marshal(papelUsuario)
if err != nil {
log.Fatalln("Failed to encode address book:", err)
}
sb.Publisher(queueName_papelUsuario, papelUsuarioProto)
fmt.Println("-----------------------------------------------------------")
recursoAcao := &pb.CriarRecursoAcao{
DataCriacao: time.Now().String(),
Resource: "auto",
Version: "v1",
Endpoints: []*pb.EndpointsDtos{
{HttpMethod: "GET", Route: "/"},
},
}
recursoAcaoProto, err := proto.Marshal(recursoAcao)
if err != nil {
log.Fatalln("Failed to encode address book:", err)
}
sb.Publisher(queueName_recursoAcao, recursoAcaoProto)
fmt.Println("-----------------------------------------------------------")
if err != nil {
fmt.Println("[Main][panic] ERROR: ", err)
panic(err)
}
time.Sleep(10 * time.Hour)
}
解决方案
推荐阅读
- python - sympy 中的方程类型
- python - QProcess 不启动 python 脚本
- reactjs - 部署后第三方 API 无法使用
- python - 将字符串转换为数组python
- igraph - 如何根据来自双模网络的匹配创建单模网络(邻接矩阵)
- postgresql - Sequelize 在 docker 中不起作用(Node + Postgres)
- vba - DIR 中的下一个文件未打开
- android - 如何在主要活动中引用 androidx 工具栏?
- c# - 使用 Unity 的 FPS GAME 按钮中的暂停菜单问题
- python - Python 线程子类:__init__ 未调用