grails - 使用带有 rabbitmq-native 插件 (3.4.4) 的 Grails 3.3.2,prefetch=1 的单个消费者在发生异常后停止消费消息
问题描述
我已经重试设置为true。我的理解是,信息应该不断地不断地传递给消费者。相反,它只是坐在那里不消耗重新排队的消息或任何新消息。我将 com.budjb 和 com.rabbitmq 和 org.springframework.amqp 的日志一直打开到 TRACE 并且没有看到任何断开连接... Heeelppp
应用程序.groovy
rabbitmq {
uri = new URI(System.env.CLOUDAMQP_URL ?: "amqp://test:test@localhost/test")
username = uri.userInfo.split(":")[0]
password = uri.userInfo.split(":")[1]
connections = [
[name : 'main',
host : uri.host,
port : 5672,
username : username,
requestedHeartbeat: 10,
automaticReconnect: true,
virtualHost : uri.path.substring(1), //remove leading slash
password : password]
]
queues = [[name: com.coco.jms.RabbitQueues.INDEX_TRANSACTION.destinationName, autoDelete: false, durable: true, exclusive: false]]
消费者:
class IndexTransactionConsumer implements MessageConsumerEventHandler {
static rabbitConfig = [
connection: 'main',
consumers : 1,
queue : Boolean.valueOf((String) System.getProperty("is_amqp_consumer")) ? RabbitQueues.INDEX_TRANSACTION.destinationName : null,
transacted: true,
autoAck : AutoAck.POST,
retry : true
]
def handleMessage(Map body, MessageContext messageContext) {
log.info("RABBITMQ - *CONSUME* Received event to index transaction (Map). " + body)
throw new Exception("Force fail")
}
....
}
更新 看来,当 transacted=true 和 autoAck = AutoAck.POST 在 AbstractConsumerContext.groovy 内触发的 txRollback() 正在阻止 basicReject nack 到达 RabbitMQ 服务器。
if (configuration.getTransacted()) {
context.getChannel().txRollback()
}
if (configuration.getAutoAck() == AutoAck.POST) {
context.getChannel().basicReject(context.getEnvelope().deliveryTag, configuration.getRetry())
}
解决方案
我通过不允许异常逃脱侦听器并自己管理 ack/nack 解决了我的问题。我认为rabbitmq-native插件中有一个很大的地方transacted = true。在我看来,它正在回滚假设在捕获异常时触发的 nack。
def handleMessage(Map body, MessageContext context) {
log.info("RABBITMQ - *CONSUME* Received event. " + body)
try {
//ensure casting by JMS to Integer is reverted
body.conflictIDList = body.conflictIDList.collect { ((Number) it).toLong() }
//do work
context.channel.basicAck(context.envelope.deliveryTag, false)
} catch (Exception ex) {
ConsumerUtility.handleMessageException(rabbitMessagePublisher, body, context, ex)
}
}
来自消费者公用事业
def
static handleMessageException(RabbitMessagePublisher rabbitMessagePublisher, Map body, MessageContext context, Throwable ex) {
log.warn("E_ception caught attempting digest message sent to " + context.envelope.routingKey + ". body=" + body + ", reason=" + ex.message)
if (body.retryCount < 3) {
//pull current message off queue, sleep thread and republish onto back of queue
context.channel.basicAck(context.envelope.deliveryTag, false)
body.retryCount = body.retryCount + 1
//upon failure sleep for 3, 6, then 9 seconds
sleep(3000 * (Integer) body.retryCount)
rabbitMessagePublisher.send {
channel = context.channel
routingKey = context.envelope.routingKey
setBody(body)
}
} else {
log.error("Rejecting message after three failed tries onto DLQ. body=" + body, ex)
context.channel.basicReject(context.envelope.deliveryTag, false)
}
}
推荐阅读
- javascript - 'position:absolute' 是否总是 'display:block'
- html - 右侧的主按钮首先获得焦点是不是很烦人?
- azure-devops - Azure DevOps 服务器导入管道功能忽略审批者定义
- python - Azure Python SDK 是否没有用于预配存储帐户的 create_or_update?
- python - 带有 zeep 的 xsd:string[] 的 SOAP 字符串列表
- php - 如何在这个 css 字符串中生成随机数(laravel)
- asp.net-core - Asp.Net Core cookie 身份验证选项登录路径通过 HTTP 路由
- ssh - 使用 SSH 将文件发送到 SFTP 服务器的 WSO2 代理服务无法进行身份验证
- java - 如果在springboot的结果中合并/组合,缓存方法(使用咖啡因)是否可能返回部分响应?
- python - 几个numba函数的并行编译