首页 > 解决方案 > ActiveMQ RedeliveryPolicy 未按预期工作

问题描述

我做了很多研究和阅读,我只是想确保我了解 ActiveMQ RedeliveryPolicy 的工作原理。

以下代码是Scala

我创建了如下连接:

val connFactory: ActiveMQConnectionFactory = new ActiveMQConnectionFactory(s"tcp://$ACTIVE_MQ_HOST:61616")
connFactory.setTrustAllPackages(true)
connFactory.setAlwaysSyncSend(true)

我创建了一个连接

val connection: ActiveMQConnection = connFactory.createConnection().asInstanceOf[ActiveMQConnection]

我创建了一个 RedeliveryPolicy

val policy: RedeliveryPolicy = activeMQConnection.getRedeliveryPolicy()
policy.setInitialRedeliveryDelay(0)
policy.setRedeliveryDelay(1000);
policy.setUseExponentialBackOff(false)
policy.setMaximumRedeliveries(2)

我已将 RedeliveryPolicy 设置为连接并启动连接

activeMQConnection.setRedeliveryPolicy(policy)
println(s"Redelivery connection policy: ${activeMQConnection.getRedeliveryPolicy}")
activeMQConnection.start()

我创建了一个会话

val refCountQueueSession: Session = activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE)

我在队列中创建了生产者和消费者

val reference_count_queue: Queue = refCountQueueSession.createQueue("ReferenceCountQueue?consumer.exclusive=true")
val reference_count_queue_producer: MessageProducer = refCountQueueSession.createProducer(reference_count_queue)
val reference_count_consumer: MessageConsumer = refCountQueueSession.createConsumer(reference_count_queue)
reference_count_consumer.setMessageListener(new GroupConsumerMessageListener("Group Reference Count Consumer"))

GroupConsumerMessageListener我已经以之前抛出异常的方式实现了它message.acknowledge()

由于在确认之前引发了异常,消息没有按预期出队,但是 RedeliveryPolicy 没有再次重试,我没有在我的消费者的日志中看到它正在尝试再次从队列中消费。

所以我错过了什么吗?我是否必须在我做的地方保持无限循环

session.recover()

?

到目前为止,这是我能够在未确认的消息上实现重新传递的唯一方法,但感觉不是正确的方法。

标签: scalaactivemq

解决方案


推荐阅读