ruby-on-rails - 使用批处理在多个线程上分发 Ruby 代码
问题描述
这是一个有点愚蠢的问题,但是我在分配工作人员需要处理的正确数据量时遇到了一些问题。
对于一些上下文,我有 60 个 Resque 进程(工作者),每个都严格设置为只有1 个线程(在这种情况下这是必须的)。
比如worker1: bundle exec env rake resque:workers QUEUE='queue_1' COUNT='1'
...
worker60: bundle exec env rake resque:workers QUEUE='queue_60' COUNT='1'
每个工人/线程(因为每个工人都有一个线程)需要处理大量数据。我遇到的问题是找到每个工人需要处理的适量数据。
我的输入
nr_rows
- 需要处理
batch_size
的数据数量 - 我们分批获取数据
nr_workers
- 工人数量
例如
nr_rows - 3_700_000
batch_size - 50_000
nr_workers - 60
目前,我正在使用以下代码:
`
per_process = ((nr_rows / batch_size) / nr_processes.to_f).ceil
start = 0
finish = per_process - 1
(1..nr_processes).each do |queue|
(start..finish).each do |i|
# Spawn worker here on this 'queue' and process data from #{i *50_00} to #{i * 50_000 + 50_000} -- Note: 50_000 = batch_size
end
start += per_process
finish += per_process
end
`
如果我们打印出start
并且finish
对于每个queue
我们将得到以下输出,则问题如下。
nr_rows - 3_700_00
batch_size - 50_000
nr_workers - 60
queue 1 - start 0 finish 1
queue 2 - start 2 finish 3
...
queue 27 - start 72 finish 73
queue 28 - start 74 finish 75
...
queue 59 - start 136 finish 137
queue 60 - start 138 finish 139
我的问题是队列 60 将开始处理来自
138 * 50_000
to 的数据138 * 50_000 + 50_000
,这超出了界限。
从队列 28 开始......所有工作人员都将超出范围。
有什么建议么?
谢谢,对不起,如果这是一个愚蠢的问题。
解决方案
这是我的建议:
# Variable Initialization
nr_rows = 3_700_000
batch_size = 50_000
nr_workers = 60
num_batches = (nr_rows / batch_size.to_f).ceil
# Generation of the batch info (first_row...last_row) in this form : [first_row1, last_row1/first_row2, last_row2/first_row3, ..., first_row_n]
batch_bounds = Array.new(num_batches) { |i| i * batch_size }
# We make sure the last batch goes to the last row : ..., first_row_n, last_row_n]
batch_bounds << nr_rows
# We calculate the exact amount of batches to distribute for each process
per_process = num_batches / nr_workers.to_f
# Since we can't distribute a non-integer batch we keep the remaining info to make sure we'll distribute the batch correctly
remaining = 0
# batch index in the batch_bounds array
batch_index = 0
# Distribution of the batches amongst all worker
nr_workers.times do |process_index|
# Adjust the batch remaining
remaining += per_process
# We take only the integer part to process full batches
num_to_process = remaining.to_i
# We distribute the batch to the current worker
num_to_process.times do
# batch_bounds[batch_index]...batch_bounds[batch_index+1] corresponds to the rows to process in the current batch
# send batch_bounds[batch_index]...batch_bounds[batch_index+1] to process_index
puts "#{process_index} : #{batch_bounds[batch_index]...batch_bounds[batch_index+1]}"
# We update the batch index
batch_index += 1
end
# We remove the batch we processed of the remaining batch to process
remaining -= num_to_process
end
# Just in case
if batch_index == batch_bounds.size - 2
# process the remaining batch
puts "#{nr_workers-1} : #{batch_bounds[batch_index]...batch_bounds[batch_index+1]}"
end
这段代码看起来有点复杂,但最终目标是平衡批次到工人,并确保如果最后一批不是 50_000 行的批次,我们会向最后一个工人发送一个更小的批次。
使用此代码,如果批次多于工人,则某些工人将获得 2 个或更多批次,如果批次少于工人,则某些工人根本不会获得批次。
推荐阅读
- scala - 我看不到使用 Spark 和 Scala 保存的 .csv 文件
- javascript - 随机化我的画廊图像
- python - 从网站上的表中获取数据
- maven - Grails 无法启动,卡在 postProcessBeanFactory
- uwp - UWP 中的动画
- javascript - nodejs中分配的全局变量问题
- jquery - 如何在 JsonResult 中按降序对数据进行排序?
- angular - 角度中 sharedVar 中的模板问题
- python - Scrapy CrawlSpider 抓取页面时遵循什么顺序?
- tomcat - Tomcat 因 StringCache.findClosest NPE 崩溃