scala - 当编码时行模式未知时,如何将字符串与行合并以创建新的火花数据帧?
问题描述
我创建了一个函数,它将一行作为输入并给出一个字符串作为输出。我计划将此功能应用于架构彼此不同的各种数据帧。这些数据框很大,每个都有数百万行,但每个数据框都有一个定义的模式
我想创建另一个函数来调用第一个函数,将函数的输出字符串与它发送到函数的行合并,并创建一个新的数据框,它将作为第二个函数的输出。
这两个函数都将在 spark-scala 环境中编写。我对 spark-scala 很陌生,不太确定如何将这些行组合成一个新的数据框
def returnTranformFunctionOutput(inputDataRow: Row, TransformFrame: Array[Row]): String = {
val resultString = "testdata"
resultString
}
def returnOutputDataframe(inputDataframe: DataFrame, TranformFrame: Array[Row]): DataFrame = {
val inputSchema = inputDataframe.schema
val outputSchema = StructType(StructField("outputVal", StringType, true) :: Nil)
val final_schema = StructType((inputSchema ++ outputSchema))
val newDf = inputDataframe.map(row => {
return Row.merge(row,TransformFunctions.returnTranformFunctionOutput(row,TranformFrame))
}),final_schema)
newDf
}
不returnOutputDataframe
编译并给我多个错误no implicits found for parameter evidence$6: Encoder[U_]
,包括type mismatch: Required:Row Found:string
在执行 Row.merge 时。
可以合并字符串和行以创建一个新行,然后可以将其组合成一个新的数据框吗?
解决方案
您正在尝试返回Dataframe
,returnOutputDataframe
但会产生 .map 步骤Dataset
,并且您也在传递模式而不是编码器。您可以转换inputDataframe
为RDD[Row]
,映射值,然后使用带有新模式的 spark.createDataFrame 创建 DF。请参见下面的示例。
val row1 = RowFactory.create("1","2")
val schema1 = new StructType()
.add("c0","string")
.add("c1","string")
val row2 = RowFactory.create("A","B")
val schema2 = new StructType()
.add("c2","string")
.add("c3","string")
val df1 = spark.createDataFrame(sc.parallelize(Seq(row1)),schema1)
df1.show()
val rdd = df1.rdd.map(s => Row.merge(s, row2))
val schema = StructType(schema1 ++ schema2)
val df = spark.createDataFrame(rdd,schema)
df.printSchema()
df.show()
+---+---+
| c0| c1|
+---+---+
| 1| 2|
+---+---+
root
|-- c0: string (nullable = true)
|-- c1: string (nullable = true)
|-- c2: string (nullable = true)
|-- c3: string (nullable = true)
+---+---+---+---+
| c0| c1| c2| c3|
+---+---+---+---+
| 1| 2| A| B|
+---+---+---+---+
推荐阅读
- go - 为什么 Go 中的错误消息不应该以标点符号结尾?
- javascript - chrome 扩展中的 vuex 共享状态
- python - 在 flask-restful 中序列化 UUID 对象
- r - 如何从 R 中的本地文件夹中 rbind 多个数据帧?
- ios - 从用 Swift/Objective-C 编写的 iOS 应用程序回复评论 API
- sqlalchemy - SQLalchemy 在时间序列的 TOP / END / CONTINUATION 插入行,以便为现有表
- java - 如何在 Android Studio 的片段中添加日历?
- c - 如何在 C 语言中使用空格进行行输入
- python - NameError:名称“意思”未定义
- java - 为什么运算符 && 不检查两个条件并终止程序测试