scala - 如何使用 Spark 2.1 将联合数据帧并行到一个数据帧
问题描述
我希望将数据合并到另一个数据帧的 foreach 循环中的一个数据帧中,但似乎有些数据丢失了。
有什么解决方案可以解决我的情况吗?下面的代码示例:
/** set master("local[*]") 会丢失数据, set master("local[1]") 不会丢失数据。**/
object testParallelizeDF extends App {
import scala.util.Random
import org.apache.spark.sql.SparkSession
//val spark = SparkSession.builder().master("local[1]").getOrCreate()//not lost data
val spark = SparkSession.builder().master("local[*]").getOrCreate()//lost data
import spark.implicits._
val values0 = List(1, 2, 3, 4, 5)
var df0 = values0.toDF
df0.repartition(5)
val values = List(1, 2, 3, 4, 5)
var df = values.toDF
df0.foreachPartition(p => {
p.foreach(r => {
val ran = Random.nextInt()
println(p.hashCode() + "==>" + r.toString() + "==>" + ran)
df = df.union(List(ran).toDF)
})
})
df.collect().foreach(println)}
解决方案
unionDataFrames
默认情况下是并行的,您不应该尝试将其放在执行程序端代码中(它将不起作用)。另外,尽量避免使用有副作用的代码,使用mapPartitions
/map
而不是foreachPartition
/ foreach
:
val randomDF = df0.mapPartitions(p => {
val randomInts = p.map(r => {
val ran = Random.nextInt()
println(p.hashCode() + "==>" + r.toString() + "==>" + ran)
ran
})
randomInts
}).toDF()
df.union(randomDF).collect().foreach(println)}
您还可以生成随机数,例如:
val randomDF = List.fill(df0.count.toInt)(Random.nextInt()).toDF()
推荐阅读
- c - 在 cs50 ide 中等待程序退出时超时
- c++ - c++中std::map的空间复杂度是多少?
- python - nginx 允许 https 内部服务器但不允许外部
- swift - 从不同的文件 SwiftUI 调用核心数据对象
- android - Leak Canary 使用 ViewPager2 检测 TabLayout 的内存泄漏
- sql - 在 Postgres 中合并 - 在冲突中使用插入
- c# - 是否可以在 Unity 中获取精灵表动画的当前帧?
- google-cloud-platform - 使用外部公共 IP(客户端 websocket)连接到 GCP 中的 linux 实例
- sql - 如何从 SQL 的列中删除所有数据?
- python - Beautiful Soup 找到所有找到没有类的某些 div