首页 > 解决方案 > 同步线程启动

问题描述

我正在运行一些代码(简化,但仍然破坏下面的版本),在等待第一次切换的 ~1/3000 执行中失败。应该发生的是:

不幸的是,它失败了thread[0]-cond.wait以超时结束并引发异常。我将如何同步它,确保cond_main不会过早发出信号?

理想情况下,我想从主线程传递一个锁定的互斥锁并在第一个生成的线程中解锁它,但是 Ruby 需要在同一个线程中解锁互斥锁 - 所以这不起作用。

自包含的复制器(本身没有多大意义,但实际工作被剥离了):

def run_test
  mutex     = Mutex.new
  cond      = ConditionVariable.new
  cond_main = ConditionVariable.new
  threads   = []

  t1_done = false
  t2_done = false

  threads << Thread.new do
    mutex.synchronize do
      # this needs to happen first
      cond_main.signal
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t2_done

      # some work
      t1_done = true
      cond.signal
    end
  end
  cond_main.wait(Mutex.new.lock, 2)

  threads << Thread.new do
    mutex.synchronize do
      cond.signal
      # some work
      t2_done = true
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t1_done
    end
  end

  threads.map(&:join)
end

5000.times { |x|
  puts "Run #{x}"
  run_test
}

在 Ruby 2.5.3 上测试

标签: rubymultithreading

解决方案


设置一个 while 块以在第二个线程完成时停止等待(在此处查看更多信息):

def run_test
  mutex     = Mutex.new
  cond      = ConditionVariable.new
  cond_main = ConditionVariable.new
  threads   = []

  spawned = false

  t1_done = false
  t2_done = false

  threads << Thread.new do
    mutex.synchronize do
      while(!spawned) do
        cond.wait(mutex, 2)
      end
      raise 'timeout waiting for switch' if !t2_done

      # some work
      t1_done = true
      cond.signal
    end
  end

  threads << Thread.new do
    mutex.synchronize do
      spawned = true
      cond.signal
      # some work
      t2_done = true
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t1_done
    end
  end

  threads.map(&:join)
end

50000.times { |x| 
  puts x 
  run_test 
}

或者,使用计数信号量,我们可以为线程分配一些优先级:

require 'concurrent-ruby'

def run_test
  mutex     = Mutex.new
  sync      = Concurrent::Semaphore.new(0)
  cond      = ConditionVariable.new
  cond_main = ConditionVariable.new
  threads   = []

  t1_done = false
  t2_done = false

  threads << Thread.new do
    mutex.synchronize do
      sync.release(1)
      # this needs to happen first
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t2_done

      # some work
      t1_done = true
      cond.signal
    end
  end

  threads << Thread.new do
    sync.acquire(1)
    mutex.synchronize do
      cond.signal
      # some work
      t2_done = true
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t1_done
    end
  end

  threads.map(&:join)
end

50000.times { |x| 
  puts x 
  run_test 
}

我更喜欢第二种解决方案,因为它允许您控制线程的顺序,尽管感觉有点脏。

出于好奇,在 Ruby 2.6 上,您的代码似乎没有引发异常(经过测试 > 10M 运行)。


推荐阅读