首页 > 解决方案 > 无法使用指定的架构创建 df

问题描述

当我试图在下面的代码中创建一个带有模式的数据框时,它不起作用,如果没有模式,每列数据都会被合并到一个列中

#transformations
val t3 = t1.map{case(a)=>(a(1).toInt,a(2))}.reduceByKey((x,y)=> `  
(x+","+y)).map{case(a,b)=>parse(a,b)}  

Parse 函数返回 Array[Int]。

代码在这里显示

`t3.collect()`  
res7: Array[Array[Int]] = Array(Array(100, 1, 1, 0, 0, 0, 2), Array(104,  
2, 0, 0, 0, 1, 3))  
#schema column names
`temp`  
res11: List[String] = List(id, review, inprogress, notstarted, completed,   
started, total)  

`val fields = temp.map(fieldName => StructField(fieldName,   
IntegerType, nullable = true))`  
fields: List[org.apache.spark.sql.types.StructField]  
#creating schema
`val schema = StructType(fields)`  
org.apache.spark.sql.types.StructType  

`val df = t3.toDF()`  
org.apache.spark.sql.DataFrame = [value: array<int>]  

`df.show()`  
+--------------------+  
|               value|  
+--------------------+  
|[100, 1, 1, 0, 0,...|  
|[104, 2, 0, 0, 0,...|  
+--------------------+  

`val df = t3.toDF(schema)`  
error: type mismatch;  


`val df = spark.createDataFrame(t3)`  
<console>:35: error: overloaded method value createDataFrame with   
alternatives  

Expected:  
+---+---------+----------+----------+------+-------+-----+  
| id|completed|inprogress|notstarted|review|started|total|  
+---+---------+----------+----------+------+-------+-----+  
|100|        0|         1|         0|     1|      0|    2|  
|104|        0|         0|         0|     2|      1|    3|  
+---------+---+----------+----------+------+-------+-----+  

标签: scalaapache-sparkapache-spark-sqlrdd

解决方案


解析数据的 RDD[Array[Int]] 可以转换为 RDD[Row] 再转换为 DataFrame:

val parsedData = Array(Array(100, 1, 1, 0, 0, 0, 2), Array(104,
  2, 0, 0, 0, 1, 3))
val rddAfterParsing = sparkContext.parallelize(parsedData)
val rddOfRows = rddAfterParsing.map(arr => Row(arr: _*))

val columnNames = Seq("id", "review", "inprogress", "notstarted", "completed", "started", "total")
val fields = columnNames.map(fieldName => StructField(fieldName,
  IntegerType, nullable = true))
val result = spark.createDataFrame(rddOfRows, StructType(fields))

result.show(false)

输出:

+---+------+----------+----------+---------+-------+-----+
|id |review|inprogress|notstarted|completed|started|total|
+---+------+----------+----------+---------+-------+-----+
|100|1     |1         |0         |0        |0      |2    |
|104|2     |0         |0         |0        |1      |3    |
+---+------+----------+----------+---------+-------+-----+

推荐阅读