首页 > 解决方案 > 如何正确迭代 Array[String]?

问题描述

我在 scala 中有一个函数,我向它发送参数,我像这样使用它:

val evega = concat.map(_.split(",")).keyBy(_(0)).groupByKey().map{case (k, v) => (k, f(v))}

我的函数 f 是:

val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)
def f(v: Array[String]): Int = {
  val parsedDates = v.map(LocalDate.parse(_, formatter))
  parsedDates.max.getDayOfYear - parsedDates.min.getDayOfYear}

这是我得到的错误:

 found   : Iterable[Array[String]]
 required: Array[String]

我已经尝试过使用:

val evega = concat.map(_.split(",")).keyBy(_(0)).groupByKey().map{case (k, v) => (k, for (date <- v) f(date))}

但我得到了大量的错误。

为了获得更好的图片, concat 中的数据是:

1974,1974-06-22
1966,1966-07-20
1954,1954-06-19
1994,1994-06-27
1954,1954-06-26
2006,2006-07-04
2010,2010-07-07
1990,1990-06-30
...

它是 RDD[String] 类型。我怎样才能正确地迭代它并从该函数 f 中获得一个 Int?

标签: scalaapache-sparkrdd

解决方案


管道旁边的 RDD 类型是:

  • concat.map(_.split(","))给出一个RDD[Array[String]]
    • 例如Array("1954", "1954-06-19")
  • concat.map(_.split(",")).keyBy(_(0))RDD[(String, Array[String])]
    • 例如("1954", Array("1954", "1954-06-19"))
  • concat.map(_.split(",")).keyBy(_(0)).groupByKey()RDD[(String, Iterable[Array[String]])]
    • 例如Iterable(("1954", Iterable(Array("1954", "1954-06-19"), Array("1954", "1954-06-24"))))

因此,当您map最后时,值的类型是Iterable[Array[String]].

由于您的输入是"1974,1974-06-22",解决方案可能包括将您的keyBy转换替换为map

input.map(_.split(",")).map(x => x(0) -> x(1)).groupByKey().map{case (k, v) => (k, f(v))}

实际上,.map(x => x(0) -> x(1))(而不是.map(x => x(0) -> x)keyBy(_(0))语法糖)将为拆分数组的第二个元素而不是数组本身提供值。因此,RDD[(String, String)]在第二步中给予而不是RDD[(String, Array[String])]


推荐阅读