ruby-on-rails - 如何在 Sidekiq Worker 中拆分大型查询?
问题描述
当我们发布课程时,我们会向所有选择进行商业交流的用户发送电子邮件。发生这种情况时,由于请求超时,我们的服务器开始返回 500。我们使用 sidekiq worker ( BroadcastMessageSendWorker
) 创建将发送我们编写的电子邮件的工作。我们使用 Sidekiq Pro,因此我们可以使用批量处理功能。
ruby '2.6.5'
gem 'pg', '= 1.2.2'
gem 'rails', '= 5.2.4.1'
gem 'sidekiq', '= 5.2.2'
gem 'sidekiq-pro', '= 4.0.4'
class BroadcastMessageSendWorker
include Sidekiq::Worker
def perform(message_guid)
ActiveRecord::Base.connection_pool.with_connection do
message = BroadcastMessage.find(message_guid)
message.with_lock do
return unless message.pending?
message.pickup!
if message.contacts.count == 0
message.finish!
return
end
batch = Sidekiq::Batch.new
batch.on(:complete, self.class, 'guid' => message_guid)
batch.jobs do
# We can't use `uniq` or `DISTINCT` with find_in_batches because after 1000 records it
# will start blowing up. Instead, use an in-memory `seen` index
seen = Set.new({})
message.contacts.select(:id).find_in_batches do |contact_batch|
args = contact_batch.pluck(:id).map do |contact_id|
next unless seen.add?(contact_id) # add? returns nil if the object is already in the set
[message_guid, contact_id]
end
Sidekiq::Client.push_bulk('class' => BroadcastMessageDeliverWorker, 'args' => args.compact)
end
end
message.update(batch_id: batch.bid)
end
end
end
def on_complete(_, options)
message = BroadcastMessage.find(options['guid'])
message.finish! if message.sending?
end
end
我们正在构建一个内存集,以确保我们不会向用户发送 2 封相同的电子邮件。ScoutAPM 告诉我们这message.contacts.select(:id)
条线路需要很长时间(联系人加入了我们的用户表,所以这在某种程度上是意料之中的)。
我分析了这个查询:
Subquery Scan on contacts (cost=226960.78..230344.36 rows=52055 width=32) (actual time=555.876..692.685 rows=87926 loops=1)
Filter: (NOT (hashed SubPlan 1))
-> CTE Scan on base_contacts (cost=224403.49..226485.69 rows=104110 width=264) (actual time=523.530..636.032 rows=87926 loops=1)
CTE base_contacts
-> Gather (cost=189856.23..224403.49 rows=104110 width=306) (actual time=523.525..554.679 rows=87926 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Hash Left Join (cost=188856.23..212992.49 rows=43379 width=306) (actual time=524.667..557.537 rows=29309 loops=3)
Hash Cond: (contacts_1.user_id = users.id)
Filter: ((contacts_1.user_id IS NULL) OR (users.can_contact AND ((users.managed_subscription_id IS NULL) OR CASE WHEN (users.managed_subscription_id = ANY ('{2,236,690}'::integer[])) THEN false ELSE true END)))
Rows Removed by Filter: 12924
-> Parallel Seq Scan on contacts contacts_1 (cost=149225.21..168513.90 rows=47078 width=306) (actual time=272.862..365.114 rows=42233 loops=3)
Filter: ((NOT (hashed SubPlan 2)) AND (NOT (hashed SubPlan 3)))
Rows Removed by Filter: 108423
SubPlan 2
-> Seq Scan on mailkick_opt_outs mailkick_opt_outs_1 (cost=0.00..2147.74 rows=71817 width=22) (actual time=0.044..16.912 rows=71898 loops=3)
Filter: (active AND (list IS NULL))
Rows Removed by Filter: 19576
SubPlan 3
-> Nested Loop (cost=0.43..146644.75 rows=101271 width=4) (actual time=0.098..142.573 rows=325264 loops=3)
-> Seq Scan on broadcast_messages (cost=0.00..9.80 rows=1 width=4) (actual time=0.066..0.085 rows=1 loops=3)
Filter: (signature = 'broadcast_message_signature'::text)
Rows Removed by Filter: 63
-> Index Scan using index_ahoy_messages_on_broadcast_message_id on ahoy_messages (cost=0.43..144633.82 rows=200113 width=8) (actual time=0.030..107.063 rows=325264 loops=3)
Index Cond: (broadcast_message_id = broadcast_messages.id)
Filter: ((user_type)::text = 'ClassType'::text)
-> Parallel Hash (cost=36562.34..36562.34 rows=176534 width=9) (actual time=106.742..106.742 rows=141443 loops=3)
Buckets: 131072 Batches: 8 Memory Usage: 3168kB
-> Parallel Seq Scan on users (cost=0.00..36562.34 rows=176534 width=9) (actual time=0.044..74.643 rows=141443 loops=3)
SubPlan 1
-> Seq Scan on mailkick_opt_outs (cost=0.00..2376.43 rows=72345 width=22) (actual time=0.011..14.309 rows=74331 loops=1)
Filter: (active AND ((list IS NULL) OR ((list)::text = 'javascript'::text)))
Rows Removed by Filter: 17143
Planning Time: 0.458 ms
Execution Time: 715.945 ms
这Parallel Seq Scan
需要很多时间,但我不知道如何加快速度。
我首先想到的是将这个worker分成不同的ID范围,在不同的时间查询数据库,以减少数据库的负载。因此,message.contacts
我不会查询,而是查询message.contacts.where('id > 1 && id < 10000')
,然后message.contacts.where('id > 10001 && id < 20000')
等等,直到我们达到最大 id。
这感觉很幼稚。如何加快此查询速度或随着时间的推移将其展开?
我也想过在上面添加一个多列索引users.managed_subscription_id
,users.managed_subscription_id
但还没有尝试过。
解决方案
推荐阅读
- ruby-on-rails - rails many_to_many 关联问题
- c++ - 使用 CMake 使用 Network 构建 Qt 项目会导致未定义的引用
- excel - Powerquery命令转换日期格式
- java - 如何使用 Java 反射创建 @Autowired 类的实例
- java - 如何通过 JInput 库将操纵杆单击打印到控制台?
- javascript - Vue 和 Vuex:更改状态时不调用计算属性
- c++ - 您可以在迭代时从 std::forward_list 中删除元素吗?
- c++ - 捕获 C++ 中的致命错误
- android - 如何将 ScrollView 与多个一起使用
Android Studio 中的布局 - python-3.x - 精度停留在某一点