scala - spark-scala 中对分组数据中非键、非整数列上的数据进行排序的解决方案是什么?
问题描述
对按类别的产品价格排序的产品数据进行排序
我有一个产品的 RDD - 列:(product_id | product_category_id | product_name | product_description | product_price | product_image)
val prdMap = prd.map(r=> (r.split(",")(1).toInt,(r.split(",")(4),r.split(",")(0),r.split(",")(2) )))
prdMap.take(5).foreach(println)
val groupByCategory = prdMap.groupByKey()
groupByCategory.take(2).foreach(println)
RDD 元素按照 Category_id 正确分组,之后我必须根据 scala 中的 product_price 对数据进行排序
如果我将 product_price 保留为字符串,则无法正确排序
groupByCategory.sortBy(_._2).take(2).foreach(println)
实际结果
(36,CompactBuffer(
(12.99,789,TaylorMade Men's Burner LTD Golf Glove),
(24.99,791,Hirzl Women's Trust Feel Golf Glove)
(13.99,790,FootJoy Men's StaCool Golf Glove) )
预期结果
(36,CompactBuffer(
(12.99,789,TaylorMade Men's Burner LTD Golf Glove),
(13.99,790,FootJoy Men's StaCool Golf Glove),
(24.99,791, Hirzl Women's Trust Feel Golf Glove) )
我尝试了几种方法 - 通过将 product_price 作为键并根据它进行排序来创建元组
- 将 product_price 转换为 Float
val prdMap2 = prd.map(r=> (r.split(",")(1).toInt,(r.split(",")(4).toFloat,(r.split(",")(0),r.split(",")(2) ))))
val groupByCategory2 = prdMap2.groupByKey()
prdMap2.groupByKey().sortBy(_._2).take(5).foreach(println)
prdMap2.groupByKey().keyBy(_._2).take(5).foreach(println)
keyBy 和 sortBy 都给出空字符串错误
19/08/11 19:51:29 ERROR executor.Executor: Exception in task 2.0 in stage 300.0 (TID 553)
java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1020)
- 转换为数据框,然后 groupBy product_category_id,但在 groupBy 之后,sortBy 或 orderBy 不起作用
val prdDF = prd.map(r=> (r.split(",")(1).toInt,r.split(",")(4).toFloat,r.split(",")(0),r.split(",")(2) )).toDF("product_category_id","product_price","product_id","product_name")
scala> prdDF.groupBy("product_category_id")
res294: org.apache.spark.sql.GroupedData = org.apache.spark.sql.GroupedData@45172e99
scala> prdDF.groupBy("product_category_id").sort("product_price")
<console>:43: error: value sort is not a member of org.apache.spark.sql.GroupedData
scala> prdDF.groupBy("product_category_id").orderBy("product_price")
<console>:43: error: value orderBy is not a member of org.apache.spark.sql.GroupedData
问题
- spark-scala 中对分组数据中非键、非整数列上的数据进行排序的解决方案是什么?
- 如何在 scala 中按降序对 2 个不同的非键、非 int 列上的数据进行排序?(这与我面临的另一个问题有关)
我是 spark-scala 的初学者,任何帮助将不胜感激。
解决方案
您可以将RDD[String]
输入转换为由目标数字元素组成RDD[(K,V)]
的value
部分进行排序,如下所示:
val rdd = sc.parallelize(Seq(
("36,12.99,789,TaylorMade Men's Burner LTD Golf Glove"),
("36,24.99,791,Hirzl Women's Trust Feel Golf Glove"),
("36,13.99,790,FootJoy Men's StaCool Golf Glove")
))
import scala.util.{Try, Success, Failure}
val rdd2 = rdd.map{ line =>
val arr = line.split(",")
val a0 = Try(arr(0).toInt) match { case Success(i) => i; case Failure(_) => 0 }
val a1 = Try(arr(1).toDouble) match { case Success(d) => d; case Failure(_) => 0.0 }
(a0, (a1, arr.tail))
}
rdd2.groupByKey.mapValues( _.toList.sortBy(_._1).map(_._2) ).collect
// res1: Array[(Int, List[Array[String]])] = Array((36, List(
// Array(12.99, 789, TaylorMade Men's Burner LTD Golf Glove),
// Array(13.99, 790, FootJoy Men's StaCool Golf Glove),
// Array(24.99, 791, Hirzl Women's Trust Feel Golf Glove)
// )))
如果您在Spark 2.4+
,请考虑将 转换RDD[(K,V)]
为 aDataFrame
并应用于聚合array_sort
中的分组数组:groupBy/collect_list
val df = rdd2.toDF("c1", "c2")
df.groupBy("c1").agg(array_sort(collect_list($"c2")).as("c2_sorted_list"))