scala - 从 Spark RDD 中提取数据,并在 scala 中填充一个元组
问题描述
我在 Hadoop/Spark 框架之上使用 Scala。
实际上我的数据属于这种类型:
RDD[(List[(String, Int)], Long)]
而且,这是此数据湖中前两行的示例:
(List(("COD_LOCALE_PROGETTO",0), ("CUP",1), ("OC_TITOLO_PROGETTO",2), ("OC_SINTESI_PROGETTO",3), ("OC_LINK",4), ("OC_COD_CICLO",5), ("OC_DESCR_CICLO",6), ("OC_COD_TEMA_SINTETICO",7), ("OC_TEMA_SINTETICO",8), ("COD_GRANDE_PROGETTO",9), ("DESCRIZIONE_GRANDE_PROGETTO",10)),0)
(List(("10CAPORTO-POZZUOLI 1",0), ("J86G08000450003",1), ("INTERVENTO C11 2° LOTTO ¿ 1° STRALCIO FUNZIONALE ¿COLLEGAMENTO TRA TANGENZIALE DI NAPOLI (VIA CAMPANA), RETE VIARIA COSTIERA E PORTO DI POZZUOLI""",2), ("INTERVENTO C11 2° LOTTO ¿ 1° STRALCIO FUNZIONALE ¿COLLEGAMENTO TRA TANGENZIALE DI NAPOLI (VIA CAMPANA), RETE VIARIA COSTIERA E PORTO DI POZZUOLI""",3), ("www.opencoesione.gov.it/progetti/10caporto-pozzuoli-1",4), (1,5), ("Ciclo di programmazione 2007-2013",6), ("07",7), ("Trasporti e infrastrutture a rete",8), (" ",9), (" ",10)),1)
在实际情况下,每行持续 194 列,我总共有超过 160 万条记录。
有了这个数据集,我想填充一个新的列表,类型为:
List[(String, Int, Int, Int)]
其中第一个“Int”是每行的每个单个字段(COD_LOCALE_PROGETTO,CUP ...),第二个字段是每个字段的大小(19、3,...)第三个是每个字段的位置,已经编码在变量中,就在字符串之后,最后一个“Int”是整个数据集中每一行的位置。
我试过这个脚本:
| val Dimensione = item._1.size;
| for(i <- 0 until Dimensione){
| ComponentiOpenCoesione :+= (item._1(i)._1.replace("\"","").toString,
| item._1(i)._1.replace("\"","").toString.size,
| item._1(i)._2.toInt,
| item._2.toLong)}
| })
但它失败了,我称之为“ComponentiOpenCoesione”的元组列表没有填充。
最后,这个变量是这样定义的:
var ComponentiOpenCoesione : List[(String, Int, Int, Long)] = List();
有人可以帮助我吗?如何从 RDD 中提取数据并将其加载到列表中?
太感谢了。
解决方案
在 scala 中,返回函数的最后一条语句。在这里,您的函数将不返回任何内容,因为它的最后一条语句是for
不返回任何内容的循环。
要更正它,您只需将其ComponentiOpenCoesione
作为最后一个语句。所以,如果你只是打算映射你RDD[(List[(String, Int)], Long)]
的 get RDD[List[(String, Int, Int, Long)]]
,你的代码应该是:
rdd.map(item => {
var ComponentiOpenCoesione: List[(String, Int, Int, Long)] = List();
val Dimensione = item._1.size;
for (i <- 0 until Dimensione) {
ComponentiOpenCoesione :+= (item._1(i)._1.replace("\"", "").toString,
item._1(i)._1.replace("\"", "").toString.size,
item._1(i)._2.toInt,
item._2.toLong)
}
ComponentiOpenCoesione
})
您可以查看Return in Scala question's answers 以了解值是如何在 scala 中返回的。
推荐阅读
- python - 优化在 python 列表上运行的 cython 函数
- caldav - SOGo:CalDAV 条目未显示在客户端(iOS、macOS、Android)中
- android - CookieManager 没有保存我的 cookie [Android - HttpUrlConnection]
- node.js - Dockerfile:如何在节点高山映像上正确安装 bcrypt?
- sql - 日期差异 = 0 在哪里子句 Oracle?
- javascript - Paho MQTT JS 客户端已连接错误?
- python - 使用 Pandas 计算频率和计数记录
- asp.net-core - 在 Asp.Net Core 下为 RequestId 日志配置 JsNLog 和 NLog
- cmake - 如何在 Windows 的 eigen3 中启用 pkg-config 支持?
- c++ - 在为类实现移动赋值运算符时真的需要自赋值检查吗?