首页 > 解决方案 > 如何在 Sidekiq Worker 中拆分大型查询?

问题描述

当我们发布课程时,我们会向所有选择进行商业交流的用户发送电子邮件。发生这种情况时,由于请求超时,我们的服务器开始返回 500。我们使用 sidekiq worker ( BroadcastMessageSendWorker) 创建将发送我们编写的电子邮件的工作。我们使用 Sidekiq Pro,因此我们可以使用批量处理功能。

ruby '2.6.5'
gem 'pg', '= 1.2.2'
gem 'rails', '= 5.2.4.1'
gem 'sidekiq', '= 5.2.2'
gem 'sidekiq-pro', '= 4.0.4'
class BroadcastMessageSendWorker
  include Sidekiq::Worker

  def perform(message_guid)
    ActiveRecord::Base.connection_pool.with_connection do
      message = BroadcastMessage.find(message_guid)

      message.with_lock do
        return unless message.pending?

        message.pickup!

        if message.contacts.count == 0
          message.finish!
          return
        end

        batch = Sidekiq::Batch.new
        batch.on(:complete, self.class, 'guid' => message_guid)
        batch.jobs do

          # We can't use `uniq` or `DISTINCT` with find_in_batches because after 1000 records it
          # will start blowing up. Instead, use an in-memory `seen` index
          seen = Set.new({})

          message.contacts.select(:id).find_in_batches do |contact_batch|
            args = contact_batch.pluck(:id).map do |contact_id| 
              next unless seen.add?(contact_id) # add? returns nil if the object is already in the set

              [message_guid, contact_id]
            end

            Sidekiq::Client.push_bulk('class' => BroadcastMessageDeliverWorker, 'args' => args.compact)
          end
        end

        message.update(batch_id: batch.bid)
      end
    end
  end

  def on_complete(_, options)
    message = BroadcastMessage.find(options['guid'])
    message.finish! if message.sending?
  end
end

我们正在构建一个内存集,以确保我们不会向用户发送 2 封相同的电子邮件。ScoutAPM 告诉我们这message.contacts.select(:id)条线路需要很长时间(联系人加入了我们的用户表,所以这在某种程度上是意料之中的)。

我分析了这个查询:

Subquery Scan on contacts  (cost=226960.78..230344.36 rows=52055 width=32) (actual time=555.876..692.685 rows=87926 loops=1)
  Filter: (NOT (hashed SubPlan 1))
  ->  CTE Scan on base_contacts  (cost=224403.49..226485.69 rows=104110 width=264) (actual time=523.530..636.032 rows=87926 loops=1)
        CTE base_contacts
          ->  Gather  (cost=189856.23..224403.49 rows=104110 width=306) (actual time=523.525..554.679 rows=87926 loops=1)
                Workers Planned: 2
                Workers Launched: 2
                ->  Parallel Hash Left Join  (cost=188856.23..212992.49 rows=43379 width=306) (actual time=524.667..557.537 rows=29309 loops=3)
                      Hash Cond: (contacts_1.user_id = users.id)
                      Filter: ((contacts_1.user_id IS NULL) OR (users.can_contact AND ((users.managed_subscription_id IS NULL) OR CASE WHEN (users.managed_subscription_id = ANY ('{2,236,690}'::integer[])) THEN false ELSE true END)))
                      Rows Removed by Filter: 12924
                      ->  Parallel Seq Scan on contacts contacts_1  (cost=149225.21..168513.90 rows=47078 width=306) (actual time=272.862..365.114 rows=42233 loops=3)
                            Filter: ((NOT (hashed SubPlan 2)) AND (NOT (hashed SubPlan 3)))
                            Rows Removed by Filter: 108423
                            SubPlan 2
                              ->  Seq Scan on mailkick_opt_outs mailkick_opt_outs_1  (cost=0.00..2147.74 rows=71817 width=22) (actual time=0.044..16.912 rows=71898 loops=3)
                                    Filter: (active AND (list IS NULL))
                                    Rows Removed by Filter: 19576
                            SubPlan 3
                              ->  Nested Loop  (cost=0.43..146644.75 rows=101271 width=4) (actual time=0.098..142.573 rows=325264 loops=3)
                                    ->  Seq Scan on broadcast_messages  (cost=0.00..9.80 rows=1 width=4) (actual time=0.066..0.085 rows=1 loops=3)
                                          Filter: (signature = 'broadcast_message_signature'::text)
                                          Rows Removed by Filter: 63
                                    ->  Index Scan using index_ahoy_messages_on_broadcast_message_id on ahoy_messages  (cost=0.43..144633.82 rows=200113 width=8) (actual time=0.030..107.063 rows=325264 loops=3)
                                          Index Cond: (broadcast_message_id = broadcast_messages.id)
                                          Filter: ((user_type)::text = 'ClassType'::text)
                      ->  Parallel Hash  (cost=36562.34..36562.34 rows=176534 width=9) (actual time=106.742..106.742 rows=141443 loops=3)
                            Buckets: 131072  Batches: 8  Memory Usage: 3168kB
                            ->  Parallel Seq Scan on users  (cost=0.00..36562.34 rows=176534 width=9) (actual time=0.044..74.643 rows=141443 loops=3)
  SubPlan 1
    ->  Seq Scan on mailkick_opt_outs  (cost=0.00..2376.43 rows=72345 width=22) (actual time=0.011..14.309 rows=74331 loops=1)
          Filter: (active AND ((list IS NULL) OR ((list)::text = 'javascript'::text)))
          Rows Removed by Filter: 17143
Planning Time: 0.458 ms
Execution Time: 715.945 ms

Parallel Seq Scan需要很多时间,但我不知道如何加快速度。

我首先想到的是将这个worker分成不同的ID范围,在不同的时间查询数据库,以减少数据库的负载。因此,message.contacts我不会查询,而是查询message.contacts.where('id > 1 && id < 10000'),然后message.contacts.where('id > 10001 && id < 20000')等等,直到我们达到最大 id。

这感觉很幼稚。如何加快此查询速度或随着时间的推移将其展开?

我也想过在上面添加一个多列索引users.managed_subscription_idusers.managed_subscription_id但还没有尝试过。

标签: ruby-on-railspostgresqlsidekiq

解决方案


推荐阅读