首页 > 解决方案 > spark-scala 中对分组数据中非键、非整数列上的数据进行排序的解决方案是什么?

问题描述

对按类别的产品价格排序的产品数据进行排序

我有一个产品的 RDD - 列:(product_id | product_category_id | product_name | product_description | product_price | product_image)

val prdMap = prd.map(r=> (r.split(",")(1).toInt,(r.split(",")(4),r.split(",")(0),r.split(",")(2) )))
prdMap.take(5).foreach(println)

val groupByCategory = prdMap.groupByKey()
groupByCategory.take(2).foreach(println)

RDD 元素按照 Category_id 正确分组,之后我必须根据 scala 中的 product_price 对数据进行排序

如果我将 product_price 保留为字符串,则无法正确排序

groupByCategory.sortBy(_._2).take(2).foreach(println)

实际结果

(36,CompactBuffer(
(12.99,789,TaylorMade Men's Burner LTD Golf Glove), 
(24.99,791,Hirzl Women's Trust Feel Golf Glove)
(13.99,790,FootJoy Men's StaCool Golf Glove) )

预期结果

(36,CompactBuffer(
(12.99,789,TaylorMade Men's Burner LTD Golf Glove), 
(13.99,790,FootJoy Men's StaCool Golf Glove),
(24.99,791, Hirzl Women's Trust Feel Golf Glove) )

我尝试了几种方法 - 通过将 product_price 作为键并根据它进行排序来创建元组

  1. 将 product_price 转换为 Float
val prdMap2 = prd.map(r=> (r.split(",")(1).toInt,(r.split(",")(4).toFloat,(r.split(",")(0),r.split(",")(2) ))))
val groupByCategory2 = prdMap2.groupByKey()

prdMap2.groupByKey().sortBy(_._2).take(5).foreach(println)
prdMap2.groupByKey().keyBy(_._2).take(5).foreach(println)

keyBy 和 sortBy 都给出空字符串错误

19/08/11 19:51:29 ERROR executor.Executor: Exception in task 2.0 in stage 300.0 (TID 553)
java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1020)
  1. 转换为数据框,然后 groupBy product_category_id,但在 groupBy 之后,sortBy 或 orderBy 不起作用
val prdDF = prd.map(r=> (r.split(",")(1).toInt,r.split(",")(4).toFloat,r.split(",")(0),r.split(",")(2) )).toDF("product_category_id","product_price","product_id","product_name")

scala> prdDF.groupBy("product_category_id")
res294: org.apache.spark.sql.GroupedData = org.apache.spark.sql.GroupedData@45172e99

scala> prdDF.groupBy("product_category_id").sort("product_price")
<console>:43: error: value sort is not a member of org.apache.spark.sql.GroupedData

scala> prdDF.groupBy("product_category_id").orderBy("product_price")
<console>:43: error: value orderBy is not a member of org.apache.spark.sql.GroupedData

问题

  1. spark-scala 中对分组数据中非键、非整数列上的数据进行排序的解决方案是什么?
  2. 如何在 scala 中按降序对 2 个不同的非键、非 int 列上的数据进行排序?(这与我面临的另一个问题有关)

我是 spark-scala 的初学者,任何帮助将不胜感激。

标签: scalaapache-sparkapache-spark-sql

解决方案


您可以将RDD[String]输入转换为由目标数字元素组成RDD[(K,V)]value部分进行排序,如下所示:

val rdd = sc.parallelize(Seq(
  ("36,12.99,789,TaylorMade Men's Burner LTD Golf Glove"),
  ("36,24.99,791,Hirzl Women's Trust Feel Golf Glove"),
  ("36,13.99,790,FootJoy Men's StaCool Golf Glove")
))

import scala.util.{Try, Success, Failure}

val rdd2 = rdd.map{ line =>
  val arr = line.split(",")
  val a0 = Try(arr(0).toInt) match { case Success(i) => i; case Failure(_) => 0 }
  val a1 = Try(arr(1).toDouble) match { case Success(d) => d; case Failure(_) => 0.0 }

  (a0, (a1, arr.tail))
}

rdd2.groupByKey.mapValues( _.toList.sortBy(_._1).map(_._2) ).collect
// res1: Array[(Int, List[Array[String]])] = Array((36, List(
//   Array(12.99, 789, TaylorMade Men's Burner LTD Golf Glove),
//   Array(13.99, 790, FootJoy Men's StaCool Golf Glove),
//   Array(24.99, 791, Hirzl Women's Trust Feel Golf Glove)
// )))

如果您在Spark 2.4+,请考虑将 转换RDD[(K,V)]为 aDataFrame并应用于聚合array_sort中的分组数组:groupBy/collect_list

val df = rdd2.toDF("c1", "c2")

df.groupBy("c1").agg(array_sort(collect_list($"c2")).as("c2_sorted_list"))

推荐阅读