go - 从 rabbitmq 获取已发布消息的响应。戈朗
问题描述
我已经实现了使用消息的工作者,用它们做某种类型的工作并在队列上发布结果。
我有 api,它将接收 http 请求,向工作人员发布消息并等待工作人员发送响应。
我有两个队列,一个是工人,另一个是网关。
Api 将 -> 在工作队列上发布事件。使用网关队列中的事件(等待响应) Worker will -> 使用工作队列中的事件。在网关队列上发布事件。
工人工作得很好,我从来没有遇到过问题。所以我将专注于api,因为那里有错误。
问题:
每次我的api发布消息时。它首先会生成一些 uuid,我们会在地图中记住这些 uuid,以便在我们使用响应时。我们可以匹配响应是哪个请求。
当我们消费消息时,第一次一切正常,我们可以将响应与请求相匹配。但是第二次,我们在地图中找不到该响应。尽管我已经添加了它。
下面是一些代码示例。
存储,将与地图一起使用的接口
package main
import "fmt"
type ChannelStorage interface {
Add(uid string, message chan ResponseMessage)
Delete(uid string)
Get(uid string) chan ResponseMessage
}
func NewChannelStorage() ChannelStorage {
return ChannelMapStorage{
channelMap: make(map[string]chan ResponseMessage),
}
}
type ChannelMapStorage struct {
channelMap map[string]chan ResponseMessage
}
func (storage ChannelMapStorage) Add(uid string, message chan ResponseMessage) {
fmt.Println(fmt.Sprintf("Adding Message: %s ", uid))
storage.channelMap[uid] = message
}
func (storage ChannelMapStorage) Delete(uid string) {
fmt.Println(fmt.Sprintf("Deleting Message: %s ", uid))
delete(storage.channelMap, uid)
}
func (storage ChannelMapStorage) Get(uid string) chan ResponseMessage {
fmt.Println(fmt.Sprintf("Getting Message: %s ", uid))
return storage.channelMap[uid]
}
这是我的发布者,它将向工作队列发送事件。
package main
import (
"fmt"
"github.com/streadway/amqp"
)
var channel *amqp.Channel
func init() {
// Connect to the rabbit.
conn, err := amqp.Dial(rabbitConfig.uri)
if err != nil {
panic(err)
}
// create channel
channel, err = conn.Channel()
if err != nil {
panic(err)
}
}
func publish(queueName string , data []byte, id string) error {
// publish message
return channel.Publish(
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
CorrelationId: id,
ContentType: "text/plain",
Body: data,
},
)
}
这是发件人,这是代码的一部分,所有操作都发生在这里。它的工作是连接到rabbitmq,消费事件。将事件与请求匹配并通知我们得到响应。
package main
import (
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func init() {
conn, err := amqp.Dial(rabbitConfig.uri)
failOnError(err, "Publisher failed to connect to the rabbitmq")
// Create channel
channel, err := conn.Channel()
failOnError(err, "Publisher failed to create channel")
// Create queue
queue, err := channel.QueueDeclare(
RECIEVE_QUEUE_NAME, // channelname
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to create queue for consumer")
// channel
messages, err := channel.Consume(
queue.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed on consuming event")
go func() {
for message:= range messages {
go handleMessage(message)
}
}()
}
func sendMessage(name string, id string ,requestType string) ([]byte, int, error) {
// Create new task message
message := TaskMessage{
Uid: uid(),
ReplyTo: RECIEVE_QUEUE_NAME,
Type: requestType,
Name: name,
Id: id,
}
data ,err := json.Marshal(message)
if err != nil {
fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
return nil, 0, err
}
err = publish(SEND_QUEUE_NAME, data, message.Uid)
if err != nil {
fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
return nil, 0, err
}
// whenever we send message, we need to add it to the waiting response channel
rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)
// Wait for the response
select {
case response := <- rchannel:
fmt.Println(fmt.Sprintf("Sending response: %s ", message.Uid))
data := response.Response
code := response.StatusCode
channelStorage.Delete(message.Uid)
return data, code, nil
case <-time.After(3 * time.Second):
// remove channel from rchans
channelStorage.Delete(message.Uid)
// Return timeout error.
return nil, 0, errors.New("Response timed out on rabbit.")
}
}
func handleMessage(msg amqp.Delivery) {
// Parse message.
response := &ResponseMessage{}
// Parse response.
err := json.Unmarshal(msg.Body, response)
if err != nil {
log.Printf("ERROR: fail unmarshl: %s", msg.Body)
return
}
// find waiting channel(with uid) and forward the reply to it
if channel := channelStorage.Get(response.Uid); channel != nil {
channel <- *response
}
}
方法 sendMessage() 将发布事件,等待消费响应。将其映射到我们的请求并返回结果。
在这里我将创建新任务
// Create new task message
message := TaskMessage{
Uid: uid(),
ReplyTo: RECIEVE_QUEUE_NAME,
Type: requestType,
Name: name,
Id: id,
}
data ,err := json.Marshal(message)
if err != nil {
fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
return nil, 0, err
}
使用来自 publisher.go 的方法发布任务
err = publish(SEND_QUEUE_NAME, data, message.Uid)
if err != nil {
fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
return nil, 0, err
}
使用我们的存储接口来映射具有我们生成的唯一 ID 的响应。
rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)
等待将通知的通道是当我们收到对已发布事件的响应时。如果我们在 3 秒内没有得到响应,我们的超时。
case response := <- rchannel:
fmt.Println(fmt.Sprintf("Sending response: %s ", message.Uid))
data := response.Response
code := response.StatusCode
channelStorage.Delete(message.Uid)
return data, code, nil
case <-time.After(3 * time.Second):
// remove channel from rchans
channelStorage.Delete(message.Uid)
// Return timeout error.
return nil, 0, errors.New("Response timed out on rabbit.")
}
当我们使用消息但尝试将 id 与响应映射时,就会出现问题。它说它为我们地图中的 id 返回 nil。
func handleMessage(msg amqp.Delivery) {
// Parse message.
response := &ResponseMessage{}
// Parse response.
err := json.Unmarshal(msg.Body, response)
if err != nil {
log.Printf("ERROR: fail unmarshl: %s", msg.Body)
return
}
// find waiting channel(with uid) and forward the reply to it
if channel := channelStorage.Get(response.Uid); channel != nil {
channel <- *response
}
}
我的 main.go 看起来像这样,它将创建全局存储并运行 http 服务器:
package main
import (
"fmt"
"os"
)
var channelStorage ChannelStorage
func main() {
channelStorage = NewChannelStorage()
err := SetupRouter(NewApi()).Run(":8080")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
因为我对golang没有太多经验。问题可能会发生,因为我创建了全局存储或类似的东西。
解决方案
首先:您的存储现在不是线程安全的,您应该使用 sync.Mutex 例如。
type ChannelMapStorage struct {
m sync.Nutex
channelMap map[string]chan ResponseMessage
}
func (storage *ChannelMapStorage) Add(uid string, message chan ResponseMessage) {
storage.m.Lock()
fmt.Println(fmt.Sprintf("Adding Message: %s ", uid))
storage.channelMap[uid] = message
storage.m.Unlock()
}
所有其他方法也是如此。
第二:您应该在发布之前将消息添加到存储中,如果发布失败则将其删除。
data ,err := json.Marshal(message)
if err != nil {
fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
return nil, 0, err
}
rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)
err = publish(SEND_QUEUE_NAME, data, message.Uid)
if err != nil {
channelStorage.Del(message.Uid)
fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
return nil, 0, err
}
推荐阅读
- javascript - 无法在反应中导入模块
- ansible - 如何在ansible中限制set_fact变量的范围
- php - php 不使用 file.php?data=value 将数据添加到数据库
- react-native - 在 react-native-web (expo) 中聚焦时更改 TextInput 的边框颜色
- matlab - 如何从高斯分布中随机初始化深度网络中层的权重?
- vue.js - 直接在浏览器中访问 url 时 vue-router 不起作用 - 404 错误
- python - 没有名为“sklearn.utils.linear_assignment_”的模块
- php - 如何防止 PHP 将 '×' 重命名为 'x' 符号?
- asp.net - GridView 始终具有相同的值
- excel - 标准的 VBA 组合框列表