scala - 将RDD转换为Dataframe
问题描述
我是火花/斯卡拉的新手。我通过从多个路径加载数据在 RDD 下创建了一个。现在我想从中创建数据框以进行进一步的操作。下面应该是数据框的架构
schema[UserId, EntityId, WebSessionId, ProductId]
rdd.foreach(println)
545456,5615615,DIKFH6545614561456,PR5454564656445454
875643,5485254,JHDSFJD543514KJKJ4
545456,5615615,DIKFH6545614561456,PR5454564656445454
545456,5615615,DIKFH6545614561456,PR5454564656445454
545456,5615615,DIKFH6545614561456,PR54545DSKJD541054
264264,3254564,MNXZCBMNABC5645SAD,PR5142545564542515
732543,8765984,UJHSG4240323545144
564574,6276832,KJDXSGFJFS2545DSAS
有没有人请帮帮我....!!!
我已经通过定义模式类和映射相同的 rdd 来尝试相同但得到错误
“ArrayIndexOutOfBoundsException:3”
解决方案
如果您将列视为字符串,则可以使用以下内容创建:
import org.apache.spark.sql.Row
val rdd : RDD[Row] = ???
val df = spark.createDataFrame(rdd, StructType(Seq(
StructField("userId", StringType, false),
StructField("EntityId", StringType, false),
StructField("WebSessionId", StringType, false),
StructField("ProductId", StringType, true))))
请注意,您必须将您的 RDD“映射”到 RDD[Row] 以便编译器允许使用“createDataFrame”方法。对于缺少的字段,您可以在 DataFrame Schema 中将列声明为可为空。
在您的示例中,您使用的是 RDD 方法spark.sparkContext.textFile()。此方法返回一个 RDD[String],这意味着您的 RDD 的每个元素都是一行。但是,你需要一个 RDD[Row]。所以你需要用逗号分割你的字符串,比如:
val list =
List("545456,5615615,DIKFH6545614561456,PR5454564656445454",
"875643,5485254,JHDSFJD543514KJKJ4",
"545456,5615615,DIKFH6545614561456,PR5454564656445454",
"545456,5615615,DIKFH6545614561456,PR5454564656445454",
"545456,5615615,DIKFH6545614561456,PR54545DSKJD541054",
"264264,3254564,MNXZCBMNABC5645SAD,PR5142545564542515",
"732543,8765984,UJHSG4240323545144","564574,6276832,KJDXSGFJFS2545DSAS")
val FilterReadClicks = spark.sparkContext.parallelize(list)
val rows: RDD[Row] = FilterReadClicks.map(line => line.split(",")).map { arr =>
val array = Row.fromSeq(arr.foldLeft(List[Any]())((a, b) => b :: a))
if(array.length == 4)
array
else Row.fromSeq(array.toSeq.:+(""))
}
rows.foreach(el => println(el.toSeq))
val df = spark.createDataFrame(rows, StructType(Seq(
StructField("userId", StringType, false),
StructField("EntityId", StringType, false),
StructField("WebSessionId", StringType, false),
StructField("ProductId", StringType, true))))
df.show()
+------------------+------------------+------------+---------+
| userId| EntityId|WebSessionId|ProductId|
+------------------+------------------+------------+---------+
|PR5454564656445454|DIKFH6545614561456| 5615615| 545456|
|JHDSFJD543514KJKJ4| 5485254| 875643| |
|PR5454564656445454|DIKFH6545614561456| 5615615| 545456|
|PR5454564656445454|DIKFH6545614561456| 5615615| 545456|
|PR54545DSKJD541054|DIKFH6545614561456| 5615615| 545456|
|PR5142545564542515|MNXZCBMNABC5645SAD| 3254564| 264264|
|UJHSG4240323545144| 8765984| 732543| |
|KJDXSGFJFS2545DSAS| 6276832| 564574| |
+------------------+------------------+------------+---------+
使用 rows rdd 您将能够创建数据框。
推荐阅读
- java - 在 @Before Spring AOP 中使用 JoinPoint 调用 Method.invoke() 时“对象不是声明类的实例”
- python - 如何在 python 中计算位置(纬度/经度)和时间数据的多维插值?
- google-analytics - 增强型电子商务:我是否会在过滤产品时推送新的印象列表?
- maven - 如何更改用于部署到不同环境的存储库 ID?
- java - 将字符串理解为类型参数
- html - 如何使用父/子组件通过 NgFor 将 NgClass 应用于单次迭代?
- php - 在 WooCommerce 中添加产品自定义输入文本作为订单项目数据
- regex - 带十进制数和普通数的计算器正则表达式
- javascript - 点击通话:导航被屏蔽
- java - 如何引用已经保存在数据库中的实例?