首页 > 解决方案 > 使用复杂类型的 StructField 构造 StructType

问题描述

有一种类型

case class RSSEntry(
  source: RSSFeed,
  uri: String,
  title: String,
  links: List[RSSLink],
  content: List[RSSContent],
  description: RSSContent,
  enclosures: List[RSSEnclosure],
  publishedDate: Long,
  updatedDate: Long,
  authors: List[RSSPerson],
  contributors: List[RSSPerson]
)

我使用 field links, fieldtitle和 field publishedDate

这是RSSLink类型

case class RSSLink(href: String, title: String)

现在请看下面的实际代码。

val schema = new StructType() .add(StructField("title", StringType, true)) spark.createDataFrame(rowRDD = newsInTrend, schema = schema) .show(20, false)

StructType应由 3 个字段组成titlelinks.hrefpublishedDateas String。如何构造3个字段的复杂类型?

更新 这里是更新的代码。

stream.foreachRDD(rdd =>
    {
      val spark = SparkSession.builder().appName(sc.appName).getOrCreate()
      import spark.sqlContext.implicits._
      case class RSSNews( title: String,
                          links: String,
                          publishedDate: String)
      val newsInTrend =
      rdd.toDF().select("links", "title", "publishedDate").rdd.filter(row =>
      {
        trends_.value.exists(word => row.getAs[String]("title").toLowerCase().contains(word.toLowerCase()))
      }).map(row => RSSNews(
                              row.getAs[String]("title"),
                              row.getAs[List[RSSLink]]("links").map(_.href).mkString(","),
                              new SimpleDateFormat("d-M-y").format(new Date(row.getAs[Long]("publishedDate")))
                           ))

      val structType: StructType = Encoders.product[RSSNews].schema

      spark.createDataFrame(newsInTrend, structType)
        .coalesce(1).write.format("json").mode(SaveMode.Append).save("D:/data/spark")//.show(20, false)
      val df1 = spark.read.format("json").load("D:/data/spark")
      df1.coalesce(1).write.format("json").mode(SaveMode.Overwrite).save("D:/data/spark/subdir")*/
    })

现在我不能DataFrame用代码创建spark.createDataFrame(newsInTrend, structType),因为newsInTrend是类型RDD[RSSNews],不是RDD[Row]。虽然有方法,但编译器toDF说我不能调用toDF. newsInTrends我不知道为什么。有什么建议吗?

标签: scalaapache-spark

解决方案


从您需要的字段中创建一个新的案例类,并从中获取 product.schema:

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.StructType
import java.text.SimpleDateFormat

case class RSSLinks(title: String,
                    links: String,
                    publishedDate: String)

val structType: StructType = Encoders.product[RSSLinks].schema
val seqRSSEntries: Seq[RSSEntry] = ???
seqRSSEntries.map(rss => RSSLinks(rss.title, rss.links.map(_.href).mkString(","), new SimpleDateFormat("yyyy-MM-dd").format(new Date(rss.publishedDate))))

更新:

您可以使用 RowFactory 从对象数组创建新行:

import org.apache.spark.sql.{Encoders, Row, RowFactory, SaveMode, SparkSession}

val newsInTrend: RDD[Row] = rdd.toDF()
  .select("links", "title", "publishedDate")
  .rdd.filter {
    row =>
      trends.exists(word => row.getAs[String]("title").toLowerCase().contains(word.toLowerCase()))
        }
  .map(
    row => 
      RowFactory.create(
        row.getAs[String]("title"),
        row.getAs[List[RSSLink]]("links").map(_.href).mkString(","),
        new SimpleDateFormat("d-M-y").format(new Date(row.getAs[Long]("publishedDate")))
      )
   )

也许创建 RSSNews 对象是不必要的。


推荐阅读