首页 > 解决方案 > Scala 流和 ExecutionContext 问题

问题描述

我是Scala的新手,我在作业中遇到了一些问题:我想构建一个可以执行 3 个主要任务的流类:过滤器、映射和 forEach。我s data is an array of elements. Each of the 3 main tasks should run in 2 different threads on my stream的流数组。此外,我需要将动作的逻辑及其实际运行分为两个不同的部分。首先在流中声明所有任务,并且只有在我运行时stream.run()我才希望发生实际操作。

我的代码:

class LearningStream[A]() {
  val es: ExecutorService = Executors.newFixedThreadPool(2)
  val ec = ExecutionContext.fromExecutorService(es)
  var streamValues: ArrayBuffer[A] = ArrayBuffer[A]()
  var r: Runnable = () => "";

  def setValues(streamv: ArrayBuffer[A]) = {
    streamValues = streamv;
  }

  def filter(p: A => Boolean): LearningStream[A] = {
    var ls_filtered: LearningStream[A] = new LearningStream[A]()
    r = () => {
      println("running real filter..")
      val (l,r) = streamValues.splitAt(streamValues.length/2)
      val a:ArrayBuffer[A]=es.submit(()=>l.filter(p)).get()
      val b:ArrayBuffer[A]=es.submit(()=>r.filter(p)).get()
      ms_filtered.setValues(a++b)
    }
    return ls_filtered
  }

  def map[B](f: A => B): LearningStream[B] = {
    var ls_map: LearningStream[B] = new LearningStream[B]()
    r = () => {
      println("running real map..")
      val (l,r) = streamValues.splitAt(streamValues.length/2)
      val a:ArrayBuffer[B]=es.submit(()=>l.map(f)).get()
      val b:ArrayBuffer[B]=es.submit(()=>r.map(f)).get()
      ls_map.setValues(a++b)
    }
    return ls_map
  }

  def forEach(c: A => Unit): Unit = {
    r=()=>{
      println("running real forEach")
      streamValues.foreach(c)}
  }

   def insert(a: A): Unit = {
    streamValues += a
  }

  def start(): Unit = {
    ec.submit(r)
  }

   def shutdown(): Unit = {
    ec.shutdown()
  }
}

我的主要:

def main(args: Array[String]): Unit = {
    var factorial=0
    val s = new LearningStream[String]
    s.filter(str=>str.startsWith("-")).map(s=>s.toInt*(-1)).forEach(i=>factorial=factorial*i)

    for(i <- -5 to 5){
      s.insert(i.toString)
    }
    println(s.streamValues)
    s.start()
    println(factorial)
    }

主要打印过滤器的输出并且阶乘没有改变(仍然是1)。我在这里想念什么?

标签: scala

解决方案


我的解决方案:如果您想获得提示而不是真正的解决方案,@Levi Ramsey 在评论中留下了一些很好的提示。

第一个问题:只有一个命令(过滤器)运行而另一个没有。解决方案:通过以下方式向每个命令的可运行对象插入对下一个流的调用:

ec.submit(ms_map.r)

为了能够关闭所有会话,我们需要在类中添加另一个 LearningStream 数据成员。但是我们不能只添加一个常规的 LearningStream 对象,因为它依赖于参数 [A]。因此,我实现了一个具有 close 功能的特征,并且我的数据成员属于该特征类型。


推荐阅读