首页 > 解决方案 > 使用 java.util.concurrent 和 cat.effect 自定义同步

问题描述

我有一个非常自定义的非平凡同步的要求,可以用公平的ReentrantLockPhaser. 似乎不可能(没有重要的定制)在fs2和上实现cats.effect

由于需要将所有阻塞操作包装到 aBlocker这里是代码:

private val l: ReentrantLock = new ReentrantLock(true)
private val c: Condition = l.newCondition
private val b: Blocker = //...

//F is declared on the class level
def lockedMutex(conditionPredicate: Int => Boolean): F[Unit] = blocker.blockOn {
  Sync[F].delay(l.lock()).bracket(_ => Sync[F].delay{
    while(!conditionPredicate(2)){
      c.await()
    }
  })(_ => Sync[F].delay(l.unlock()))
}

问题: 是否保证包含的代码c.await()将在Thread获取/释放的相同中执行ReentrantLock

这是一个至关重要的部分,因为如果不是,IllegalMonitorStateException它将被抛出。

标签: multithreadingscalaconcurrencyfunctional-programmingjava.util.concurrent

解决方案


使用cat-effect 之类的东西时,您确实不需要担心线程,而是可以在更高级别上描述您的问题。

这应该得到您想要的相同行为,它将运行高优先级作业,直到没有更多可以选择低优先级作业。在完成一个低优先级的工作后,每根光纤将首先检查是否有更多的高优先级工作,然后再尝试再次选择一个低优先级的工作:

import cats.effect.Async
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._

import scala.concurrent.ExecutionContext

object HighLowPriorityRunner {
  final case class Config[F[_]](
      highPriorityJobs: Queue[F, F[Unit]],
      lowPriorityJobs: Queue[F, F[Unit]],
      customEC: Option[ExecutionContext]
  )

  def apply[F[_]](config: Config[F])
                 (implicit F: Async[F]): F[Unit] = {
    val processOneJob =
      config.highPriorityJobs.tryTake.flatMap {
        case Some(hpJob) => hpJob
        case None => config.lowPriorityJobs.tryTake.flatMap {
          case Some(lpJob) => lpJob
          case None => F.unit
        }
      }

    val loop: F[Unit] = processOneJob.start.foreverM

    config.customEC.fold(ifEmpty = loop)(ec => loop.evalOn(ec))
  }
}

您可以使用customEC提供您自己的ExecutionContext来控制在后台运行您的纤程的真实线程的数量。

代码可以这样使用:

import cats.effect.{Async, IO, IOApp, Resource}
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    Resource.make(IO(Executors.newFixedThreadPool(2)))(ec => IO.blocking(ec.shutdown())).use { ec =>
      Program[IO](ExecutionContext.fromExecutor(ec))
    }
}

object Program {
  private def createJob[F[_]](id: Int)(implicit F: Async[F]): F[Unit] =
    F.delay(println(s"Starting job ${id} on thread ${Thread.currentThread.getName}")) *>
    F.delay(Thread.sleep(1.second.toMillis)) *> // Blocks the Fiber! - Only for testing, use F.sleep on real code.
    F.delay(println(s"Finished job ${id}!"))

  def apply[F[_]](customEC: ExecutionContext)(implicit F: Async[F]): F[Unit] = for {
    highPriorityJobs <- Queue.unbounded[F, F[Unit]]
    lowPriorityJobs <- Queue.unbounded[F, F[Unit]]
    runnerFiber <- HighLowPriorityRunner(HighLowPriorityRunner.Config(
      highPriorityJobs,
      lowPriorityJobs,
      Some(customEC)
    )).start
    _ <- List.range(0, 10).traverse_(id => highPriorityJobs.offer(createJob(id)))
    _ <- List.range(10, 15).traverse_(id => lowPriorityJobs.offer(createJob(id)))
    _ <- F.sleep(5.seconds)
    _ <- List.range(15, 20).traverse_(id => highPriorityJobs.offer(createJob(id)))
    _ <- runnerFiber.join.void
  } yield ()
}

应该产生这样的输出:

Starting job 0 on thread pool-1-thread-1
Starting job 1 on thread pool-1-thread-2
Finished job 0!
Finished job 1!
Starting job 2 on thread pool-1-thread-1
Starting job 3 on thread pool-1-thread-2
Finished job 2!
Finished job 3!
Starting job 4 on thread pool-1-thread-1
Starting job 5 on thread pool-1-thread-2
Finished job 4!
Finished job 5!
Starting job 6 on thread pool-1-thread-1
Starting job 7 on thread pool-1-thread-2
Finished job 6!
Finished job 7!
Starting job 8 on thread pool-1-thread-1
Starting job 9 on thread pool-1-thread-2
Finished job 8!
Finished job 9!
Starting job 10 on thread pool-1-thread-1
Starting job 11 on thread pool-1-thread-2
Finished job 10!
Finished job 11!
Starting job 15 on thread pool-1-thread-1
Starting job 16 on thread pool-1-thread-2
Finished job 15!
Finished job 16!
Starting job 17 on thread pool-1-thread-1
Starting job 18 on thread pool-1-thread-2
Finished job 17!
Finished job 18!
Starting job 19 on thread pool-1-thread-1
Starting job 12 on thread pool-1-thread-2
Finished job 19!
Starting job 13 on thread pool-1-thread-1
Finished job 12!
Starting job 14 on thread pool-1-thread-2
Finished job 13!
Finished job 14!

感谢 Gavin Bisesi (@Daenyth)将我最初的想法提炼成这个!


此处提供完整代码


推荐阅读