python - 在没有 sql 函数的情况下在 PySpark 中对数据框进行排序
问题描述
我在打印这个查询时遇到了一些麻烦,其中月份按正确顺序排序。
是否有 pyspark 函数命令可以按降序格式化月份列?(不使用 sql 命令)
from pyspark import SparkContext
from pyspark.sql import SQLContext
from operator import add
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
rows = sc.textFile("data.csv")
data = rows.map(lambda line: line.split(","))
header = data.first()
q = data.filter(lambda line: line != header)\
.map(lambda x:(x[1], 1))\
.reduceByKey(add)\
.sortByKey(True)\
.collect()
sqlContext.createDataFrame(q, ['Month','Total number of operated flights']).show()
+-----+--------------------------------+
|Month|Total number of operated flights|
+-----+--------------------------------+
| 1| 621559|
| 10| 629992|
| 11| 605149|
| 12| 614139|
| 2| 565604|
| 3| 639209|
| 4| 614648|
| 5| 631609|
| 6| 629280|
| 7| 648560|
| 8| 653279|
| 9| 600187|
+-----+--------------------------------+
解决方案
这是因为月份列被视为字符串。有几种方法可以实现正确的顺序。例如,您可以将列转换int
为 RDD 调用并sortByKey(False)
用于降序:
q = data.filter(lambda line: line != header)\
.map(lambda x:(int(x[1]), 1))\
.reduceByKey(add)\
.sortByKey(False)\
.collect()
我并不完全符合您的期望,但当然您不一定需要调用collect()
以从 RDD 生成 DF - 您可以通过运行来实现:
df = data.filter(lambda line: line != header)\
.map(lambda x:(int(x[1]), 1))\
.reduceByKey(add)\
.sortByKey(False)
.toDF()
您也可以使用 DataFrame API 进行转换df.withColumn('Month', df['Month'].as(pyspark.sql.types.StringType())
,但您已声明不想使用 Spark SQL。
推荐阅读
- oracle-sqldeveloper - 表和数据在所有连接中,而不是我在其中创建的那个,当我建立新连接时,我必须删除具有相同名称的表
- javascript - React 中的 React.component 除了创建我们组件的实例并在其上设置 props 对象之外还有什么作用?
- javascript - 尽管验证成功,但验证对象为空
- impala - 从 long | 中提取字段名称值 使用 REGEXP_EXTRACT 在 Cloudera Impala 中分隔字符串
- reactjs - ReactJS,axios - 使用 react-scripts start 命令从本地被 CORS 策略阻止
- azure - 使用 NiFi 从 Azure 到 Google Cloud Platform 的数据流
- tableau-api - Tableau 以行为基数计算百分比
- javascript - 响应未显示 ID 屏幕?
- amazon-web-services - Dynamodb 设计推荐
- haproxy - 即使很少有后端关闭,haproxy 也允许其他后端