首页 > 解决方案 > 使用 RabbitMQ 在消费者回调中发布到队列

问题描述

我一直在搞乱 RabbitMQ 并遇到了一个关于从消费者回调将消息发布到通道的特定问题。虽然我能够很好地确认消息,但尝试将消息发布到频道并没有任何作用。

下面是一个例子:

// Stuff here...

final Connection connection = connectionFactory.newConnection();
final Channel channel = connection.createChannel();

channel.basicQos(16);

channel.basicConsume("transcoding", false, "someConsumerTag", new DefaultConsumer(channel) {

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        executorService.submit(() -> {
            TestClass testClass = new TestClass();
            testClass.addObserver((o, arg) -> {
                // And stuff here...
                channel.basicAck(envelope.getDeliveryTag(), true);
                channel.basicPublish("", "notification", null, "message");
            });
            testClass.run();
        })
    }
});

如图所示,我正在使用ExecutoreService. 我有一个实现Runnable和扩展的类Observable。完成TestClass工作后,会通知观察者,我手动确认消息。这工作正常。

我现在想发布一条新消息,但是 RabbitMQ 从来没有收到发布的消息。我尝试创建一个新频道并将其用于发布,但这也不起作用。接下来,我认为这可能是线程问题,但是在单独的线程上发布也不起作用。

我打算尝试为发布创建单独的 RabbitMQ 连接,但这对我来说意义不大,因为通道应该是单向的。我已经阅读了 RabbitMQ 关于并发的注释,并没有看到任何突出的地方。

我错过了什么?

标签: javarabbitmq

解决方案


所以事实证明问题是消息有效负载。顺便说一句,我已经创建了两个独立的渠道——一个用于消费者,另一个用于生产者。


推荐阅读