首页 > 解决方案 > 如何使用 Spark 2.1 将联合数据帧并行到一个数据帧

问题描述

我希望将数据合并到另一个数据帧的 foreach 循环中的一个数据帧中,但似乎有些数据丢失了。

有什么解决方案可以解决我的情况吗?下面的代码示例:

/** set master("local[*]") 会丢失数据, set master("local[1]") 不会丢失数据。**/

object testParallelizeDF extends App {
import scala.util.Random
import org.apache.spark.sql.SparkSession

//val spark = SparkSession.builder().master("local[1]").getOrCreate()//not lost data
val spark = SparkSession.builder().master("local[*]").getOrCreate()//lost data

import spark.implicits._

val values0 = List(1, 2, 3, 4, 5)
var df0 = values0.toDF
df0.repartition(5)

val values = List(1, 2, 3, 4, 5)
var df = values.toDF

df0.foreachPartition(p => {
    p.foreach(r => {
        val ran = Random.nextInt()
        println(p.hashCode() + "==>" + r.toString() + "==>" + ran)
        df = df.union(List(ran).toDF)
    })
})

df.collect().foreach(println)}

标签: scalaapache-sparkapache-spark-sql

解决方案


unionDataFrames默认情况下是并行的,您不应该尝试将其放在执行程序端代码中(它将不起作用)。另外,尽量避免使用有副作用的代码,使用mapPartitions/map而不是foreachPartition/ foreach

val randomDF = df0.mapPartitions(p => {
    val randomInts = p.map(r => {
        val ran = Random.nextInt()
        println(p.hashCode() + "==>" + r.toString() + "==>" + ran)
       ran
    })

   randomInts
}).toDF()

df.union(randomDF).collect().foreach(println)}

您还可以生成随机数,例如:

val randomDF = List.fill(df0.count.toInt)(Random.nextInt()).toDF()

推荐阅读