scala - 使用复杂类型的 StructField 构造 StructType
问题描述
有一种类型
case class RSSEntry(
source: RSSFeed,
uri: String,
title: String,
links: List[RSSLink],
content: List[RSSContent],
description: RSSContent,
enclosures: List[RSSEnclosure],
publishedDate: Long,
updatedDate: Long,
authors: List[RSSPerson],
contributors: List[RSSPerson]
)
我使用 field links
, fieldtitle
和 field publishedDate
。
这是RSSLink
类型
case class RSSLink(href: String, title: String)
现在请看下面的实际代码。
val schema = new StructType()
.add(StructField("title", StringType, true))
spark.createDataFrame(rowRDD = newsInTrend, schema = schema)
.show(20, false)
StructType
应由 3 个字段组成title
:links.href
和publishedDate
as String
。如何构造3个字段的复杂类型?
更新 这里是更新的代码。
stream.foreachRDD(rdd =>
{
val spark = SparkSession.builder().appName(sc.appName).getOrCreate()
import spark.sqlContext.implicits._
case class RSSNews( title: String,
links: String,
publishedDate: String)
val newsInTrend =
rdd.toDF().select("links", "title", "publishedDate").rdd.filter(row =>
{
trends_.value.exists(word => row.getAs[String]("title").toLowerCase().contains(word.toLowerCase()))
}).map(row => RSSNews(
row.getAs[String]("title"),
row.getAs[List[RSSLink]]("links").map(_.href).mkString(","),
new SimpleDateFormat("d-M-y").format(new Date(row.getAs[Long]("publishedDate")))
))
val structType: StructType = Encoders.product[RSSNews].schema
spark.createDataFrame(newsInTrend, structType)
.coalesce(1).write.format("json").mode(SaveMode.Append).save("D:/data/spark")//.show(20, false)
val df1 = spark.read.format("json").load("D:/data/spark")
df1.coalesce(1).write.format("json").mode(SaveMode.Overwrite).save("D:/data/spark/subdir")*/
})
现在我不能DataFrame
用代码创建spark.createDataFrame(newsInTrend, structType)
,因为newsInTrend
是类型RDD[RSSNews]
,不是RDD[Row]
。虽然有方法,但编译器toDF
说我不能调用toDF
. newsInTrends
我不知道为什么。有什么建议吗?
解决方案
从您需要的字段中创建一个新的案例类,并从中获取 product.schema:
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.StructType
import java.text.SimpleDateFormat
case class RSSLinks(title: String,
links: String,
publishedDate: String)
val structType: StructType = Encoders.product[RSSLinks].schema
val seqRSSEntries: Seq[RSSEntry] = ???
seqRSSEntries.map(rss => RSSLinks(rss.title, rss.links.map(_.href).mkString(","), new SimpleDateFormat("yyyy-MM-dd").format(new Date(rss.publishedDate))))
更新:
您可以使用 RowFactory 从对象数组创建新行:
import org.apache.spark.sql.{Encoders, Row, RowFactory, SaveMode, SparkSession}
val newsInTrend: RDD[Row] = rdd.toDF()
.select("links", "title", "publishedDate")
.rdd.filter {
row =>
trends.exists(word => row.getAs[String]("title").toLowerCase().contains(word.toLowerCase()))
}
.map(
row =>
RowFactory.create(
row.getAs[String]("title"),
row.getAs[List[RSSLink]]("links").map(_.href).mkString(","),
new SimpleDateFormat("d-M-y").format(new Date(row.getAs[Long]("publishedDate")))
)
)
也许创建 RSSNews 对象是不必要的。
推荐阅读
- android - 在 openCV 2.4.9 中保存具有自定义名称的图像
- python - python中的肘部方法
- php - PHP从一个json字符串将多条记录插入mysql
- php - 如何对返回数据laravel的变量进行分页?
- javascript - 使用 map、filter 和 reduce 变换随时间变化的对象数组
- python - 根据给定手牌确定所有合法牌手(Big 2)
- excel - Excel索引/小,将多行减少到一个范围
- javascript - 基于两个/三个下拉值的jquery隐藏文本字段
- vert.x - Vertx - 使用 Router failureHandler 处理异步调用中的错误
- postgis - 是否可以在 PostGIS 中使用假投影?