multithreading - 使用 java.util.concurrent 和 cat.effect 自定义同步
问题描述
我有一个非常自定义的非平凡同步的要求,可以用公平的ReentrantLock
和Phaser
. 似乎不可能(没有重要的定制)在fs2
和上实现cats.effect
。
由于需要将所有阻塞操作包装到 aBlocker
这里是代码:
private val l: ReentrantLock = new ReentrantLock(true)
private val c: Condition = l.newCondition
private val b: Blocker = //...
//F is declared on the class level
def lockedMutex(conditionPredicate: Int => Boolean): F[Unit] = blocker.blockOn {
Sync[F].delay(l.lock()).bracket(_ => Sync[F].delay{
while(!conditionPredicate(2)){
c.await()
}
})(_ => Sync[F].delay(l.unlock()))
}
问题:
是否保证包含的代码c.await()
将在Thread
获取/释放的相同中执行ReentrantLock
?
这是一个至关重要的部分,因为如果不是,IllegalMonitorStateException
它将被抛出。
解决方案
使用cat-effect 之类的东西时,您确实不需要担心线程,而是可以在更高级别上描述您的问题。
这应该得到您想要的相同行为,它将运行高优先级作业,直到没有更多可以选择低优先级作业。在完成一个低优先级的工作后,每根光纤将首先检查是否有更多的高优先级工作,然后再尝试再次选择一个低优先级的工作:
import cats.effect.Async
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import scala.concurrent.ExecutionContext
object HighLowPriorityRunner {
final case class Config[F[_]](
highPriorityJobs: Queue[F, F[Unit]],
lowPriorityJobs: Queue[F, F[Unit]],
customEC: Option[ExecutionContext]
)
def apply[F[_]](config: Config[F])
(implicit F: Async[F]): F[Unit] = {
val processOneJob =
config.highPriorityJobs.tryTake.flatMap {
case Some(hpJob) => hpJob
case None => config.lowPriorityJobs.tryTake.flatMap {
case Some(lpJob) => lpJob
case None => F.unit
}
}
val loop: F[Unit] = processOneJob.start.foreverM
config.customEC.fold(ifEmpty = loop)(ec => loop.evalOn(ec))
}
}
您可以使用customEC
提供您自己的ExecutionContext来控制在后台运行您的纤程的真实线程的数量。
代码可以这样使用:
import cats.effect.{Async, IO, IOApp, Resource}
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
Resource.make(IO(Executors.newFixedThreadPool(2)))(ec => IO.blocking(ec.shutdown())).use { ec =>
Program[IO](ExecutionContext.fromExecutor(ec))
}
}
object Program {
private def createJob[F[_]](id: Int)(implicit F: Async[F]): F[Unit] =
F.delay(println(s"Starting job ${id} on thread ${Thread.currentThread.getName}")) *>
F.delay(Thread.sleep(1.second.toMillis)) *> // Blocks the Fiber! - Only for testing, use F.sleep on real code.
F.delay(println(s"Finished job ${id}!"))
def apply[F[_]](customEC: ExecutionContext)(implicit F: Async[F]): F[Unit] = for {
highPriorityJobs <- Queue.unbounded[F, F[Unit]]
lowPriorityJobs <- Queue.unbounded[F, F[Unit]]
runnerFiber <- HighLowPriorityRunner(HighLowPriorityRunner.Config(
highPriorityJobs,
lowPriorityJobs,
Some(customEC)
)).start
_ <- List.range(0, 10).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- List.range(10, 15).traverse_(id => lowPriorityJobs.offer(createJob(id)))
_ <- F.sleep(5.seconds)
_ <- List.range(15, 20).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- runnerFiber.join.void
} yield ()
}
应该产生这样的输出:
Starting job 0 on thread pool-1-thread-1
Starting job 1 on thread pool-1-thread-2
Finished job 0!
Finished job 1!
Starting job 2 on thread pool-1-thread-1
Starting job 3 on thread pool-1-thread-2
Finished job 2!
Finished job 3!
Starting job 4 on thread pool-1-thread-1
Starting job 5 on thread pool-1-thread-2
Finished job 4!
Finished job 5!
Starting job 6 on thread pool-1-thread-1
Starting job 7 on thread pool-1-thread-2
Finished job 6!
Finished job 7!
Starting job 8 on thread pool-1-thread-1
Starting job 9 on thread pool-1-thread-2
Finished job 8!
Finished job 9!
Starting job 10 on thread pool-1-thread-1
Starting job 11 on thread pool-1-thread-2
Finished job 10!
Finished job 11!
Starting job 15 on thread pool-1-thread-1
Starting job 16 on thread pool-1-thread-2
Finished job 15!
Finished job 16!
Starting job 17 on thread pool-1-thread-1
Starting job 18 on thread pool-1-thread-2
Finished job 17!
Finished job 18!
Starting job 19 on thread pool-1-thread-1
Starting job 12 on thread pool-1-thread-2
Finished job 19!
Starting job 13 on thread pool-1-thread-1
Finished job 12!
Starting job 14 on thread pool-1-thread-2
Finished job 13!
Finished job 14!
感谢 Gavin Bisesi (@Daenyth)将我最初的想法提炼成这个!
此处提供完整代码。
推荐阅读
- vb.net - ContextSwitchDeadlock 异常
- sql - SQL 联合 2 表与一个表的联合,其中一个表的列为空
- sql - 如何根据 Rails 中的关联模型设置默认值?
- php - 无法在核心 php 中使用 MD5 加密密码登录
- python - Dataframe to_parquet:parquet列顺序会影响文件大小吗?
- javascript - 将鼠标悬停在独立的 html 表格行上时,在散景图中渲染 BoxAnnotation
- sql - 甲骨文 1 = 意义
- amazon-web-services - WSO2 Identity Server - 使用 AWS SES 发送租户特定的电子邮件
- regex - 如何使用正则表达式指出段落中的密码字段
- javascript - 如何创建全屏覆盖搜索栏 - reactjs