首页 > 解决方案 > 已确认的消息正在重试

问题描述

我正在使用 Google Pub/Sub ruby​​ 客户端来处理发送到多个主题的消息。对于收到的每条消息,我使用 ActiveJob 将其排入队列并确认它以将其标记为已处理。

subscription.listen do |msg|
  Rails.logger.debug("Processing message with id #{msg.message_id}")

  MyJob.perform_later(JSON.parse(msg.data))
  msg.acknowledge!
  
  Rails.logger.debug("ACKed message with id #{msg.message_id}")
end

但是,通过检查日志,我看到已经确认的消息正在被一次又一次地处理(请参阅日志摘录):

2021-05-04 02:15:17.089 EDT "Processing message with id 2260372604401883"
2021-05-04 02:15:17.180 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:17:58.121 EDT "Processing message with id 2260372604401883"
2021-05-04 02:17:58.186 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:20:59.899 EDT "Processing message with id 2260372604401883"
2021-05-04 02:20:59.985 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:22:21.083 EDT "Processing message with id 2260372604401883"
2021-05-04 02:22:21.394 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:24:18.389 EDT "Processing message with id 2260372604401883"
2021-05-04 02:24:18.485 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:25:54.274 EDT "Processing message with id 2260372604401883"
2021-05-04 02:25:54.385 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:26:59.087 EDT "Processing message with id 2260372604401883"
2021-05-04 02:26:59.184 EDT "ACKed message with id 2260372604401883" 

Google 报告的未确认消息的数量正在急剧增加,因此我怀疑该acknowledge!方法的行为不符合预期。

标签: rubygoogle-cloud-pubsub

解决方案


我以前从未见过这个问题,但我可以建议一些事情来尝试:

  1. 记录订户确认截止日期以确保它有足够的时间MyJob.perform_later返回。这实际上只是一个健全性检查,因为它很可能是默认的 60 秒,但您不妨检查一下。
  2. 在订阅者上配置错误处理程序:
subscriber = subscription.listen do |msg|
  # process message
  msg.acknowledge!
end

Rails.logger.debug("subscriber ack deadline: #{subscriber.deadline}")

# Register to be notified when unhandled errors occur.
subscriber.on_error do |error|
  Rails.logger.error(error.message)
end
  1. 配置 gRPC 调试日志记录。通用记录器配置说明位于LOGGING.md中。要使它们适应 Rails,您可以将以下内容放入config/initializers/grpc_logging.rb
module MyLogger
  def logger
    Rails.logger
  end
end

# Define a gRPC module-level logger method before grpc/logconfig.rb loads.
module GRPC
  extend MyLogger
end

推荐阅读