scala - Scala 流和 ExecutionContext 问题
问题描述
我是Scala的新手,我在作业中遇到了一些问题:我想构建一个可以执行 3 个主要任务的流类:过滤器、映射和 forEach。我s data is an array of elements. Each of the 3 main tasks should run in 2 different threads on my stream
的流数组。此外,我需要将动作的逻辑及其实际运行分为两个不同的部分。首先在流中声明所有任务,并且只有在我运行时stream.run()
我才希望发生实际操作。
我的代码:
class LearningStream[A]() {
val es: ExecutorService = Executors.newFixedThreadPool(2)
val ec = ExecutionContext.fromExecutorService(es)
var streamValues: ArrayBuffer[A] = ArrayBuffer[A]()
var r: Runnable = () => "";
def setValues(streamv: ArrayBuffer[A]) = {
streamValues = streamv;
}
def filter(p: A => Boolean): LearningStream[A] = {
var ls_filtered: LearningStream[A] = new LearningStream[A]()
r = () => {
println("running real filter..")
val (l,r) = streamValues.splitAt(streamValues.length/2)
val a:ArrayBuffer[A]=es.submit(()=>l.filter(p)).get()
val b:ArrayBuffer[A]=es.submit(()=>r.filter(p)).get()
ms_filtered.setValues(a++b)
}
return ls_filtered
}
def map[B](f: A => B): LearningStream[B] = {
var ls_map: LearningStream[B] = new LearningStream[B]()
r = () => {
println("running real map..")
val (l,r) = streamValues.splitAt(streamValues.length/2)
val a:ArrayBuffer[B]=es.submit(()=>l.map(f)).get()
val b:ArrayBuffer[B]=es.submit(()=>r.map(f)).get()
ls_map.setValues(a++b)
}
return ls_map
}
def forEach(c: A => Unit): Unit = {
r=()=>{
println("running real forEach")
streamValues.foreach(c)}
}
def insert(a: A): Unit = {
streamValues += a
}
def start(): Unit = {
ec.submit(r)
}
def shutdown(): Unit = {
ec.shutdown()
}
}
我的主要:
def main(args: Array[String]): Unit = {
var factorial=0
val s = new LearningStream[String]
s.filter(str=>str.startsWith("-")).map(s=>s.toInt*(-1)).forEach(i=>factorial=factorial*i)
for(i <- -5 to 5){
s.insert(i.toString)
}
println(s.streamValues)
s.start()
println(factorial)
}
主要打印过滤器的输出并且阶乘没有改变(仍然是1)。我在这里想念什么?
解决方案
我的解决方案:如果您想获得提示而不是真正的解决方案,@Levi Ramsey 在评论中留下了一些很好的提示。
第一个问题:只有一个命令(过滤器)运行而另一个没有。解决方案:通过以下方式向每个命令的可运行对象插入对下一个流的调用:
ec.submit(ms_map.r)
为了能够关闭所有会话,我们需要在类中添加另一个 LearningStream 数据成员。但是我们不能只添加一个常规的 LearningStream 对象,因为它依赖于参数 [A]。因此,我实现了一个具有 close 功能的特征,并且我的数据成员属于该特征类型。
推荐阅读
- python - 如何更新一个只有一个字段
- javascript - Promise.reject() 返回的值与 Promise.resolve() 不同吗?
- python - 在 MSAL Python 库中,ConfidentialClientApplication 及其底层 TokenCache 对象是线程安全的吗?
- javascript - Javascript 在渲染代码之前从 DOM 中删除项目
- reactjs - React:在使用上下文提供程序之前在上下文提供程序中运行异步操作
- flutter - 如何使用 SpeedDial 制作工作按钮?
- java - 动态地希望从管理员端更新编辑文本中的文本,并应在用户端反映更新的文本
- django - 在 django 中设置排序以使用特定于语言的顺序
- caching - touch(): Utime failed: 缓存清除权限被拒绝
- postgresql - HQL - 检查数组是否包含值