首页 > 解决方案 > 使用 Scio 将 SCollection 从 textFile 放入 BigQuery

问题描述

我已经阅读了一些文档textFile,并做了flatMap一个单词,为每个单词添加了一些额外的信息:

val col = sc.textFile(args.getOrElse("input","documents/*"))
    .flatMap(_.split("\\s+").filter(_.nonEmpty))
val mapped = col.map(t => t + ": " + extraInformation())

我目前正在轻松地将其保存为文本

mapped.saveAsTextFile(args.getOrElse("output", "results"))

但我不知道如何将地图保存到 BigQuery 架构。我见过的所有示例都从 BigQuery 创建初始 Scollection,然后将其保存到另一个表中,因此初始集合[TableRow]不是[String].

这里的正确方法是什么?我是否应该调查如何将我的数据转换为 Big Query 可以接受的一种集合?或者我应该尝试进一步调查如何将这个纯文本直接推送到表格中?

标签: scalaspotify-scio

解决方案


我建议@BigQueryType.toTable在案例类上使用注释,如下所示:

import com.spotify.scio.bigquery._

object MyScioJob {

  @BigQueryType.toTable
  case class WordAnnotated(word: String, extraInformation: String)


  def main(args: Array[String]): Unit = {
    // ...job setup logic

    sc.textFile(args.getOrElse("input","documents/*"))
      .flatMap(_.split("\\s+").filter(_.nonEmpty))
      .map(t => WordAnnotated(t, extraInformation())
      .saveAsTypedBigQuery("myProject:myDataset.myTable")
  }
}

Scio wiki上有更多关于此的信息。


推荐阅读