java - RabbitMQ,标头交换,未由标头路由的消息 x-match = all
问题描述
我正在尝试与队列设置标头交换,在该队列中,消息基于收件人标头进行路由。
交换是类型标头。
到目前为止,该类能够连接到交换器并将消息提供给队列。它还能够订阅队列并接收消息。每当订阅者的连接被取消时,它也会关闭连接。
当前的问题是邮件不是按收件人的标头值路由的。
给定以下课程:
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@Slf4j
public class MyQueue {
private final ConnectionFactory connectionFactory;
private Channel channel;
public MyQueue(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public String sendMessage(TestTextMessage message) throws UndeliverableMessageException {
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
Map<String, Object> headers = new HashMap<>();
headers.put(RabbitMqConfig.MATCH_HEADER, message.getRecipient());
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
.headers(headers).build();
log.info("Sending message to {}", headers);
channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME, "", props,
message.getMessage().getBytes(StandardCharsets.UTF_8));
log.info("RabbitMQ sent message {} to {}", message.getMessage(), message.getRecipient());
return "ok";
} catch (TimeoutException e) {
log.error("Rabbit mq timeout", e);
} catch (IOException e) {
log.error("Rabbit mq io error", e);
}
throw new UndeliverableMessageException();
}
public Flux<String> listenMessages(String recipient) throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection();
this.channel = connection.createChannel();
// The map for the headers.
Map<String, Object> headers = new HashMap<>();
headers.put("x-match", "all");
headers.put(RabbitMqConfig.MATCH_HEADER, recipient);
final String[] consumerTag = new String[1];
Flux<String> as = Flux.create(sink -> new MessageListener<String>() {
{
try {
log.info("Binding to {}", headers);
channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, "",
headers);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("Subscriber {} received a message {} with headers {}", recipient, delivery.getEnvelope(),
delivery.getProperties().getHeaders());
sink.next(delivery.getEnvelope().getDeliveryTag() + "--" + message);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
consumerTag[0] = channel.basicConsume(RabbitMqConfig.QUEUE_NAME,
true, deliverCallback, tag -> {
sink.complete();
});
} catch (IOException e) {
log.error("RabbitMQ IOException ", e);
}
}
});
return as.doOnCancel(() -> {
try {
if (consumerTag[0] == null) {
log.error("RabbitMQ uncloseable subscription, consumerTag is null!");
channel.close();
return;
}
channel.basicCancel(consumerTag[0]);
channel.close();
log.info("RabbitMQ CANCEL subscription for recipient {}", recipient);
} catch (IOException | TimeoutException e) {
log.error("RabbitMQ channel close error", e);
}
});
}
interface MessageListener<T> {
}
}
通过以下调用声明交换
channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
绑定收件人日志:
Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}
但是,消息不匹配,就好像它们是随机路由的
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
为什么不x-match: all
匹配?
解决方案
在阅读了@Gryphon 发布的评论后,在订阅者方面,我最终为每个参与者创建了一个队列。
channel.queueDeclare(RabbitMqConfig.QUEUE_NAME + "-" + recipient,
true,
false,
false,
null)
在发布者端,代码保持不变,消息被发送到交换器,交换器将根据x-match: all
配置处理路由,将消息路由到适当的队列。
推荐阅读
- ballerina - Ballerina 包构建失败
- r - 按 data.table 中的日期范围和标识符有条件地选择多个项目
- java - 嵌套的 ViewPager 不刷新
- javascript - 我应该怎么做才能在我的网络应用程序中使用特定字体?
- swift4 - 更改 AVAsset 的首选数量
- gradle - java.util.zip.ZipException:重复条目:com/google/common/annotations/Beta.class
- list - 如何匹配列表中列表中的相同字符并分别导出结果
- php - 无法在我的数据库中正确输入日期类型数据
- java - 如何生成具有动态属性名称的类文件
- java - Eclipse:修复“类型参数的冗余规范”导致错误