scala - 在不使用 collect 的情况下有效地迭代 spark 数据帧
问题描述
我有一个包含 2000 万条记录的巨大数据框。我需要迭代数据框 df1 并逐行读取每一行,并根据 df1 的列值构造另外两个数据框 df2 和 df3 作为输出。
输入 - df1 有 20 列和 2000 万条记录。输出 -df2 有 4 列,将根据 df1 中的列值创建 2000 万条记录。输出 - df3 有 20 列,将根据 df1 中的列值创建 500-8 亿条记录。
目前的做法——
//EMI Transaction Schema
val emiDFSchema = StructType(
StructField("AS_OF_DATE",StringType, false)::
StructField("EST_END_DT",StringType, false)::
StructField("EFF_DT",StringType, false)::
StructField("NET_PRCPL_AMT",DecimalType, false)::
...
...
Nil)
val emiDFSchema1 = StructType(
StructField("AS_OF_DATE",StringType, false)::
StructField("EST_END_DT",StringType, false)::
StructField("EFF_DT",StringType, false)::
StructField("NET_PRCPL_AMT",DecimalType, false)::
...
...
Nil)
val rwList = new ListBuffer[Row]()
val rwList1 = new ListBuffer[Row]()
df1.collect().map{row=>
.....
//calculation of attributes based on columns of df1
......
//creating Row object for df2 with all calculated attributes
val row = Row(attr1,attr2,....attr20)
rwList+=(row)
for(i<-1 to n){
...
//calculation of attributes based on columns of df1
...
//creating Row object for df3 with all calculated attributes
val row1 = Row(attr1,attr2,....attr20)
rwList1+=(row1)
}
}
val emiDF1 = spark.createDataFrame(spark.sparkContext.parallelize(rwList),emiSchema)
val emiDF2 = spark.createDataFrame(spark.sparkContext.parallelize(rwList1),emiSchema1)
由于 df1 很大,因此在其上执行 collect().map 会花费大量时间。您能否建议一种在更短的时间内有效地迭代 df1 的替代方法?
---Spark v2.4 ---Scala
解决方案
val df11 = df1.selectExpr(...)
推荐阅读
- javascript - 使用 npm-start 为 React 应用程序启动开发服务器时出错
- c# - EF 代码优先 - 在 db 中列出类型属性表名称
- ruby - 我无法在 macOS High Sierra 上安装 Jekyll。bundle 安装在 ffi 时停止 make 错误
- ios - SwiftUI - 预览画布给出协调安装错误
- python - 如何从 Pool.starmap_async() 中获取结果?
- r - group_vars 中的错误:缺少参数“x”,没有默认值
- arduino - arduino 开关控制 - 三路输出
- windows - 解密消息因访问冲突而失败
- javascript - 为什么这个 requestAnimationFrame debounce 示例不起作用?
- angular - Angular RxJs:从 observable 获取最后一个或默认值