首页 > 解决方案 > 从 Spark RDD 中提取数据,并在 scala 中填充一个元组

问题描述

我在 Hadoop/Spark 框架之上使用 Scala。

实际上我的数据属于这种类型:

RDD[(List[(String, Int)], Long)]

而且,这是此数据湖中前两行的示例:

(List(("COD_LOCALE_PROGETTO",0), ("CUP",1), ("OC_TITOLO_PROGETTO",2), ("OC_SINTESI_PROGETTO",3), ("OC_LINK",4), ("OC_COD_CICLO",5), ("OC_DESCR_CICLO",6), ("OC_COD_TEMA_SINTETICO",7), ("OC_TEMA_SINTETICO",8), ("COD_GRANDE_PROGETTO",9), ("DESCRIZIONE_GRANDE_PROGETTO",10)),0)

(List(("10CAPORTO-POZZUOLI 1",0), ("J86G08000450003",1), ("INTERVENTO C11 2° LOTTO ¿ 1° STRALCIO FUNZIONALE ¿COLLEGAMENTO TRA TANGENZIALE DI NAPOLI (VIA CAMPANA), RETE VIARIA COSTIERA E PORTO DI POZZUOLI""",2), ("INTERVENTO C11 2° LOTTO ¿ 1° STRALCIO FUNZIONALE ¿COLLEGAMENTO TRA TANGENZIALE DI NAPOLI (VIA CAMPANA), RETE VIARIA COSTIERA E PORTO DI POZZUOLI""",3), ("www.opencoesione.gov.it/progetti/10caporto-pozzuoli-1",4), (1,5), ("Ciclo di programmazione 2007-2013",6), ("07",7), ("Trasporti e infrastrutture a rete",8), (" ",9), (" ",10)),1)

在实际情况下,每行持续 194 列,我总共有超过 160 万条记录。

有了这个数据集,我想填充一个新的列表,类型为:

List[(String, Int, Int, Int)]

其中第一个“Int”是每行的每个单个字段(COD_LOCALE_PROGETTO,CUP ...),第二个字段是每个字段的大小(19、3,...)第三个是每个字段的位置,已经编码在变量中,就在字符串之后,最后一个“Int”是整个数据集中每一行的位置。

我试过这个脚本:

     | val Dimensione = item._1.size;
     | for(i <- 0 until Dimensione){
     | ComponentiOpenCoesione :+= (item._1(i)._1.replace("\"","").toString,
     | item._1(i)._1.replace("\"","").toString.size,
     | item._1(i)._2.toInt,
     | item._2.toLong)}
     | })

但它失败了,我称之为“ComponentiOpenCoesione”的元组列表没有填充。

最后,这个变量是这样定义的:

var ComponentiOpenCoesione : List[(String, Int, Int, Long)] = List();

有人可以帮助我吗?如何从 RDD 中提取数据并将其加载到列表中?

太感谢了。

标签: scalaapache-spark

解决方案


在 scala 中,返回函数的最后一条语句。在这里,您的函数将不返回任何内容,因为它的最后一条语句是for不返回任何内容的循环。

要更正它,您只需将其ComponentiOpenCoesione作为最后一个语句。所以,如果你只是打算映射你RDD[(List[(String, Int)], Long)]的 get RDD[List[(String, Int, Int, Long)]],你的代码应该是:

rdd.map(item => {
  var ComponentiOpenCoesione: List[(String, Int, Int, Long)] = List();
  val Dimensione = item._1.size;
  for (i <- 0 until Dimensione) {
    ComponentiOpenCoesione :+= (item._1(i)._1.replace("\"", "").toString,
      item._1(i)._1.replace("\"", "").toString.size,
      item._1(i)._2.toInt,
      item._2.toLong)
  }
  ComponentiOpenCoesione
})

您可以查看Return in Scala question's answers 以了解值是如何在 scala 中返回的。


推荐阅读