首页 > 解决方案 > 使用批处理在多个线程上分发 Ruby 代码

问题描述

这是一个有点愚蠢的问题,但是我在分配工作人员需要处理的正确数据量时遇到了一些问题。

对于一些上下文,我有 60 个 Resque 进程(工作者),每个都严格设置为只有1 个线程(在这种情况下这是必须的)。

比如worker1: bundle exec env rake resque:workers QUEUE='queue_1' COUNT='1' ... worker60: bundle exec env rake resque:workers QUEUE='queue_60' COUNT='1'

每个工人/线程(因为每个工人都有一个线程)需要处理大量数据。我遇到的问题是找到每个工人需要处理的适量数据。

我的输入

nr_rows- 需要处理 batch_size的数据数量 - 我们分批获取数据 nr_workers- 工人数量

例如

nr_rows - 3_700_000 batch_size - 50_000 nr_workers - 60

目前,我正在使用以下代码:

`

  per_process = ((nr_rows / batch_size) / nr_processes.to_f).ceil
  start = 0
  finish = per_process - 1

  (1..nr_processes).each do |queue|
    (start..finish).each do |i|
       # Spawn worker here on this 'queue' and process data from #{i *50_00} to #{i * 50_000 + 50_000} -- Note: 50_000 = batch_size 
    end

    start += per_process
    finish += per_process
  end

`

如果我们打印出start并且finish对于每个queue我们将得到以下输出,则问题如下。

nr_rows - 3_700_00 batch_size - 50_000 nr_workers - 60

queue 1 - start 0 finish 1 queue 2 - start 2 finish 3 ... queue 27 - start 72 finish 73 queue 28 - start 74 finish 75 ... queue 59 - start 136 finish 137 queue 60 - start 138 finish 139

我的问题是队列 60 将开始处理来自 138 * 50_000to 的数据138 * 50_000 + 50_000,这超出了界限。

从队列 28 开始......所有工作人员都将超出范围。

有什么建议么?

谢谢,对不起,如果这是一个愚蠢的问题。

标签: ruby-on-railsrubymultithreadingparallel-processing

解决方案


这是我的建议:

# Variable Initialization
nr_rows = 3_700_000
batch_size = 50_000
nr_workers = 60

num_batches = (nr_rows / batch_size.to_f).ceil
# Generation of the batch info (first_row...last_row) in this form : [first_row1, last_row1/first_row2, last_row2/first_row3, ..., first_row_n]
batch_bounds = Array.new(num_batches) { |i| i * batch_size }
# We make sure the last batch goes to the last row : ..., first_row_n, last_row_n]
batch_bounds << nr_rows

# We calculate the exact amount of batches to distribute for each process
per_process = num_batches / nr_workers.to_f
# Since we can't distribute a non-integer batch we keep the remaining info to make sure we'll distribute the batch correctly
remaining = 0
# batch index in the batch_bounds array
batch_index = 0

# Distribution of the batches amongst all worker
nr_workers.times do |process_index|
    # Adjust the batch remaining
    remaining += per_process
    # We take only the integer part to process full batches
    num_to_process = remaining.to_i

    # We distribute the batch to the current worker
    num_to_process.times do
        # batch_bounds[batch_index]...batch_bounds[batch_index+1] corresponds to the rows to process in the current batch
        # send batch_bounds[batch_index]...batch_bounds[batch_index+1] to process_index
        puts "#{process_index} : #{batch_bounds[batch_index]...batch_bounds[batch_index+1]}"
        # We update the batch index
        batch_index += 1
    end

    # We remove the batch we processed of the remaining batch to process
    remaining -= num_to_process
end
# Just in case
if batch_index == batch_bounds.size - 2
    # process the remaining batch
    puts "#{nr_workers-1} : #{batch_bounds[batch_index]...batch_bounds[batch_index+1]}"
end

这段代码看起来有点复杂,但最终目标是平衡批次到工人,并确保如果最后一批不是 50_000 行的批次,我们会向最后一个工人发送一个更小的批次。

使用此代码,如果批次多于工人,则某些工人将获得 2 个或更多批次,如果批次少于工人,则某些工人根本不会获得批次。


推荐阅读