首页 > 解决方案 > 等到值出现在哈希中

问题描述

最近,我被分配了一个任务来构建一个 REST API 请求,该请求负责将消息发送到 Kafka 的入站通道,然后等待出站通道的输出。一切都很顺利,直到我遇到与等待此特定消息相关的问题。

值得指出的是,在成功到达后,消息会被写入全局消息持有者,这只是底层的 ruby​​ 哈希。下面是监控哈希的函数,直到后者被填充一些值。

def monitor_payment_hash(key)
 while @uuid.payment_create.get_message(key).nil?
   next
 end
 @uuid.payment_create.get_message(key)
end

以这种方式实施它是否合适?此时我应该尝试什么?
注意:Kafka 消费者在单独的线程中运行。

更新

我刚刚前往 ruby​​ 文档并偶然发现了一些关于频道的有趣部分。据我所知,通道是 ruby​​tines 之间通信的最佳选择(只是 goroutines 的一个花哨的名称,但在 ruby​​ 生态系统中:))

标签: rubyruby-kafka

解决方案


我认为您需要timeout一种强制停止轮询过程的方法,此外,您将来可能需要一个摘要来改进。

class Poller
  def self.poll(key:, from_source:, options: {})
    start_time = Time.now
    catch(:stop_polling) do
      loop do
        message = from_source.get_message(key)
        if message.nil?
          wait_time = Time.now - start_time
          throw :stop_polling if wait_time > options[:timeout]
        else
          yield(message) if block_given?
          throw :stop_polling
        end
      end
    end
  end
end

def monitor_payment_hash(key)
  Poller.poll key: key, from_source: @uuid.payment_create, options: {timeout: 60} do |message|
    # write to the global message holders
    # or handle message by block
    yield(message) if block_given?
  end
end

您可能需要添加更多逻辑,例如超时重试、轮询密钥列表、日志...我建议您学习如何从此源构建长轮询:https ://github.com/aws/aws-sdk -ruby/blob/version-3/gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb


推荐阅读