首页 > 解决方案 > 火花动态分组多列

问题描述

我是 spark 新手,我需要为数据框分组多个列,如下图所示。

root
 |-- Id: integer (nullable = true)
 |-- Traffic Volume Count Location Address: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- Date of Count: string (nullable = true)
 |-- Total Passing Vehicle Volume: integer (nullable = true)
 |-- Vehicle Volume By Each Direction of Traffic: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)

我需要将两列分组StreetTotal Passing Vehicle Volume下面的代码如下所示:

trafficDf.groupBy("Street","Total Passing Vehicle Volume").count().orderBy("Street").show(100)

但问题是我需要多少列来执行分组我事先不知道这是一个运行时信息,我将以 json 的形式获得,我必须从 json 中提取我需要执行分组的列。

我知道我可以通过在其中运行 SQL 查询来将我dataframe的表转换为表。createOrReplaceTempView但我想知道必须有某种方法无需编写 SQL。



我知道我df.select()可以采取哪些expr()

df.select(expr("Id as new_Id, Street as new_Street")).show()

如果我传入的是同样的东西,groupBy()我会收到错误:

var dynamic_condition="Street, Total Passing Vehicle Volume" // this will be created from json where i'll get column names by looping through runtime info
trafficDf.groupBy(expr(dynamic_condition)).count().show()

错误:

mismatched input ',' expecting <EOF>(line 1, pos 6)

== SQL ==
Street, Total Passing Vehicle Volume

我做错了,我检查了文档,groupBY()我认为它不能expr()作为论据,或者可能是。任何帮助都会得到帮助

注意:我知道可以通过在数据框之上编写 SQL 查询来实现,但我正在尝试其他方式。

标签: scalaapache-spark

解决方案


在上面的示例中,您将列列表传递为String,您需要将其传递为List[String]

来自 API 文档

def groupBy(col1: String, cols: String*): RelationalGroupedDataset

示例代码片段如下所示


def dynamicGroup(df: DataFrame, cols: List[String] ): DataFrame = {
  df.groupBy(cols.head, cols.tail: _*)
}

然后你可以如下调用它

val listOfStrings =  List("A", "B", "C")
val result = dynamicGroup(df, listOfStrings)

推荐阅读