dataframe - 在 Spark 中重新分区更改数据帧的行顺序
问题描述
在应用 .repartition 函数后,我想了解我的数据框发生了什么。如果我的原始数据框是:
+--------+------+--------------+-------+-----+
|integers|floats|integer_arrays|letters|nulls|
+--------+------+--------------+-------+-----+
| 1| -1.0| [1, 2]| a| 1|
| 2| 0.5| [3, 4, 5]| b| null|
| 3| 2.7| [6, 7, 8, 9]| c| 2|
+--------+------+--------------+-------+-----+
我跑:
df.repartition(10).show()
生成的数据框具有不同顺序的行:
+--------+------+--------------+-------+-----+
|integers|floats|integer_arrays|letters|nulls|
+--------+------+--------------+-------+-----+
| 3| 2.7| [6, 7, 8, 9]| c| 2|
| 2| 0.5| [3, 4, 5]| b| null|
| 1| -1.0| [1, 2]| a| 1|
+--------+------+--------------+-------+-----+
为什么行的顺序会改变?
具有 3 行被分成 10 个分区的数据框实际上发生了什么?
我可以看到它分配的分区吗?
谢谢您的帮助。
解决方案
您的初始 DataFrame 的行分布在不同的分区上。当您调用show
行的子集时,将从分区中获取并传递给驱动程序,然后驱动程序将以表格格式显示它们。
要查看您的行分配到的分区,请使用 pyspark sql 函数spark_partition_id()
:
>>> from pyspark.sql.functions import spark_partition_id
>>> df0 = spark.range(3)
>>> df1 = df0.withColumn("partition_id_before", spark_partition_id())
>>> df1.show()
+---+-------------------+
| id|partition_id_before|
+---+-------------------+
| 0| 1|
| 1| 2|
| 2| 3|
+---+-------------------+
现在,当您要求重新洗牌时,Spark 将计算每一行的哈希值,并根据该值和洗牌操作中使用的默认分区数,将每一行移动到一个(可能不同的)分区,如下所示:
>>> df2 = df1.repartition(10).withColumn("partition_id_after", spark_partition_id())
>>> df2.show()
+---+-------------------+------------------+
| id|partition_id_before|partition_id_after|
+---+-------------------+------------------+
| 2| 3| 5|
| 0| 1| 6|
| 1| 2| 9|
+---+-------------------+------------------+
一般来说,由于 Spark 是分布式处理的框架,我的建议是不要依赖(感知的)行的位置顺序,而是将 DataFrame 的内容视为行的集合(缺乏顺序的集合,如群论)。像通常这样的函数orderBy
仅用于显示目的,例如从某事物中获取前 N 个结果,然后顺序很重要。在大多数操作中,忽略顺序。
推荐阅读
- c++ - 当整数 cin 语句在上面时,Getline 命令被跳过
- python - 如何在 matplotlib 的堆栈图中显示负值?
- asp.net - 将 Route 参数附加到 href
- java - 如何使用 Java 15 将 JavaFX 15 代码部署到 JAR
- ruby-on-rails - 如何更新 STRIPE CHECKOUT SESSION 属性 RAILS
- php - 在一个循环中管理两个不同的阵列 TWIG
- google-sheets - 如何根据之前的单元格有条件地更改 Google 表格中单元格的颜色?
- php - PHP - 循环遍历 UNION ALL 查询并按类别分隔项目
- c++ - C++ 线程安全的字符串包装器
- neo4j - 如何处理 Neo4j 中的每个 allShortestPaths?