首页 > 解决方案 > RabbitMQ,标头交换,未由标头路由的消息 x-match = all

问题描述

我正在尝试与队列设置标头交换,在该队列中,消息基于收件人标头进行路由。

交换是类型标头。

到目前为止,该类能够连接到交换器并将消息提供给队列。它还能够订阅队列并接收消息。每当订阅者的连接被取消时,它也会关闭连接。

当前的问题是邮件不是按收件人的标头值路由的。

给定以下课程:

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Slf4j
public class MyQueue {

private final ConnectionFactory connectionFactory;
private Channel channel;


public MyQueue(ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
}

public String sendMessage(TestTextMessage message) throws UndeliverableMessageException {
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {

        Map<String, Object> headers = new HashMap<>();
        headers.put(RabbitMqConfig.MATCH_HEADER, message.getRecipient());
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
                .priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
                .headers(headers).build();

        log.info("Sending message to {}", headers);

        channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME, "", props,
                message.getMessage().getBytes(StandardCharsets.UTF_8));

        log.info("RabbitMQ sent message {} to {}", message.getMessage(), message.getRecipient());
        return "ok";
    } catch (TimeoutException e) {
        log.error("Rabbit mq timeout", e);
    } catch (IOException e) {
        log.error("Rabbit mq io error", e);
    }
    throw new UndeliverableMessageException();
}

public Flux<String> listenMessages(String recipient) throws IOException, TimeoutException {
    Connection connection = connectionFactory.newConnection();
    this.channel = connection.createChannel();

    // The map for the headers.
    Map<String, Object> headers = new HashMap<>();
    headers.put("x-match", "all");
    headers.put(RabbitMqConfig.MATCH_HEADER, recipient);

    final String[] consumerTag = new String[1];
    Flux<String> as = Flux.create(sink -> new MessageListener<String>() {
        {
            try {
                log.info("Binding to {}", headers);
                channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, "",
                        headers);
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                    log.info("Subscriber {} received a message {} with headers {}", recipient, delivery.getEnvelope(),
                            delivery.getProperties().getHeaders());

                    sink.next(delivery.getEnvelope().getDeliveryTag() + "--" + message);
                    //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                };

                consumerTag[0] = channel.basicConsume(RabbitMqConfig.QUEUE_NAME,
                        true, deliverCallback, tag -> {
                            sink.complete();
                        });

            } catch (IOException e) {
                log.error("RabbitMQ IOException ", e);
            }
        }

    });

    return as.doOnCancel(() -> {
        try {
            if (consumerTag[0] == null) {
                log.error("RabbitMQ uncloseable subscription, consumerTag is null!");
                channel.close();
                return;
            }
            channel.basicCancel(consumerTag[0]);
            channel.close();
            log.info("RabbitMQ CANCEL subscription for recipient {}", recipient);
        } catch (IOException | TimeoutException e) {
            log.error("RabbitMQ channel close error", e);
        }
    });
}

interface MessageListener<T> {

}
}

通过以下调用声明交换

channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);

绑定收件人日志:

Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}

使用 x-match 绑定 3 个收件人: 绑定3个收件人

但是,消息不匹配,就好像它们是随机路由的

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

为什么不x-match: all匹配?

标签: javarabbitmq

解决方案


在阅读了@Gryphon 发布的评论后,在订阅者方面,我最终为每个参与者创建了一个队列。

channel.queueDeclare(RabbitMqConfig.QUEUE_NAME + "-" + recipient,
    true,
    false,
    false,
    null)

在发布者端,代码保持不变,消息被发送到交换器,交换器将根据x-match: all配置处理路由,将消息路由到适当的队列。


推荐阅读