scala - Future map 和 foreach 的行为
问题描述
我有以下代码在地图和未来打印当前线程名称
object ConcurrencyTest1 {
def main(args: Array[String]) {
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
println("main thread: " + Thread.currentThread().getName)
Future {
println("future job: " + Thread.currentThread().getName)
Thread.sleep(1000)
10
}.map { x => {
println("map: " + Thread.currentThread().getName)
Thread.sleep(1000)
x * x
}
}.foreach { x => {
Thread.sleep(1000)
println("foreach: " + Thread.currentThread().getName)
println(x)
}
}
Thread.sleep(5000)
}
}
输出是:
main thread: main
future job: ForkJoinPool-1-worker-5
map: ForkJoinPool-1-worker-5
foreach: ForkJoinPool-1-worker-5
从输出来看,future job、map 和 foreach 都在同一个线程中运行。我想问这个结果是否是确定性的,即它总是会输出相同的结果。或者,它们可能在不同的线程中运行
解决方案
我不认为这是保证...
但是,在您的情况下,我认为这是来自Thread.sleep
调用(当它们完成时,等待处理的下一个 Future 是后续的 map/foreach;其他 Futures 已经在其他可用线程上启动)。
您可以通过删除Thread.sleep
调用来看到这一点:
注意:为了获得更多数据,这会遍历许多 args。
object ConcurrencyTestNoSleep {
def main(args: Array[String]) {
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
println("main thread: " + Thread.currentThread().getName)
args.map { a =>
Future {
println(s"future job $a: " + Thread.currentThread().getName)
// Thread.sleep(1)
10
}.map { x => {
println(s"map $a: " + Thread.currentThread().getName)
// Thread.sleep(1)
x * x
}
}.foreach { x => {
// Thread.sleep(1)
println(s"foreach $a:" + Thread.currentThread().getName)
// println(x)
}
}
}
Thread.sleep(5000)
}
}
运行时:
@ ConcurrencyTestNoSleep.main(Array[String]("a", "b", "c", "d", "e", "f", "g") )
main thread: main
future job a: scala-execution-context-global-954
map a: scala-execution-context-global-954
foreach a:scala-execution-context-global-955
future job c: scala-execution-context-global-955
map c: scala-execution-context-global-955
foreach c:scala-execution-context-global-955
future job d: scala-execution-context-global-955
map d: scala-execution-context-global-955
foreach d:scala-execution-context-global-955
future job e: scala-execution-context-global-955
map e: scala-execution-context-global-955
foreach e:scala-execution-context-global-955
future job f: scala-execution-context-global-955
map f: scala-execution-context-global-955
foreach f:scala-execution-context-global-955
future job g: scala-execution-context-global-955
map g: scala-execution-context-global-955
foreach g:scala-execution-context-global-955
future job b: scala-execution-context-global-954
map b: scala-execution-context-global-954
foreach b:scala-execution-context-global-954
您可以在第三行中看到,这次"a"
它跨越了两个不同的线程。
但是,随着睡眠,它似乎确实重用了相同的线程:
object ConcurrencyTestSleep {
def main(args: Array[String]) {
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
println("main thread: " + Thread.currentThread().getName)
args.map { a =>
Future {
println(s"future job $a: " + Thread.currentThread().getName)
Thread.sleep(1)
10
}.map { x => {
println(s"map $a: " + Thread.currentThread().getName)
Thread.sleep(1)
x * x
}
}.foreach { x => {
Thread.sleep(1)
println(s"foreach $a:" + Thread.currentThread().getName)
// println(x)
}
}
}
Thread.sleep(5000)
}
}
这里每个 arg 使用相同的线程(在 map 和 foreach 中):
@ ConcurrencyTestSleep.main(Array[String]("a", "b", "c", "d", "e", "f", "g") )
main thread: main
future job a: scala-execution-context-global-1162
map a: scala-execution-context-global-1162
future job c: scala-execution-context-global-1164
future job b: scala-execution-context-global-1163
future job d: scala-execution-context-global-1165
map b: scala-execution-context-global-1163
map c: scala-execution-context-global-1164
map d: scala-execution-context-global-1165
foreach a:scala-execution-context-global-1162
future job e: scala-execution-context-global-1162
foreach b:scala-execution-context-global-1163
foreach d:scala-execution-context-global-1165
future job g: scala-execution-context-global-1165
foreach c:scala-execution-context-global-1164
map e: scala-execution-context-global-1162
future job f: scala-execution-context-global-1163
map g: scala-execution-context-global-1165
map f: scala-execution-context-global-1163
foreach e:scala-execution-context-global-1162
foreach f:scala-execution-context-global-1162
foreach g:scala-execution-context-global-1165
也就是说,我不会依赖这个!
推荐阅读
- sql-server - 找出两列之间的差异
- javascript - 如何将 target="_blank" 添加到 document.write 中?
- python - 根据某些条件拆分大型文本文件
- python - 连接丢失时,Kafka 消费者继续挂起等待消息
- android - Firebase AuthUI 在 android 上无法正常工作
- java - BAD 请求 400 消耗 multipart/form-data jersey clientresponse java
- php - 大于和小于精确匹配值
- mongoose - MongoDB在Graphql中聚合后查找
- gradle - 如何在gradle中获得组?
- amazon-web-services - AWS Glue 终端节点 - 无法读取错误文件“/var/aws/emr/userData.json”