首页 > 解决方案 > 在没有 sql 函数的情况下在 PySpark 中对数据框进行排序

问题描述

我在打印这个查询时遇到了一些麻烦,其中月份按正确顺序排序。

是否有 pyspark 函数命令可以按降序格式化月份列?(不使用 sql 命令)

from pyspark import SparkContext
from pyspark.sql import SQLContext
from operator import add

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

rows = sc.textFile("data.csv")
data = rows.map(lambda line: line.split(","))
header = data.first()

q = data.filter(lambda line: line != header)\
        .map(lambda x:(x[1], 1))\
        .reduceByKey(add)\
        .sortByKey(True)\
        .collect()

sqlContext.createDataFrame(q, ['Month','Total number of operated flights']).show()

+-----+--------------------------------+
|Month|Total number of operated flights|
+-----+--------------------------------+
|    1|                          621559|
|   10|                          629992|
|   11|                          605149|
|   12|                          614139|
|    2|                          565604|
|    3|                          639209|
|    4|                          614648|
|    5|                          631609|
|    6|                          629280|
|    7|                          648560|
|    8|                          653279|
|    9|                          600187|
+-----+--------------------------------+

标签: pythonsortingapache-sparkpyspark

解决方案


这是因为月份列被视为字符串。有几种方法可以实现正确的顺序。例如,您可以将列转换int为 RDD 调用并sortByKey(False)用于降序:

q = data.filter(lambda line: line != header)\
    .map(lambda x:(int(x[1]), 1))\
    .reduceByKey(add)\
    .sortByKey(False)\
    .collect()

我并不完全符合您的期望,但当然您不一定需要调用collect()以从 RDD 生成 DF - 您可以通过运行来实现:

df = data.filter(lambda line: line != header)\
 .map(lambda x:(int(x[1]), 1))\
 .reduceByKey(add)\
 .sortByKey(False)
 .toDF()

您也可以使用 DataFrame API 进行转换df.withColumn('Month', df['Month'].as(pyspark.sql.types.StringType()),但您已声明不想使用 Spark SQL。


推荐阅读