首页 > 解决方案 > pyspark窗口函数中的意外帧结构

问题描述

我试图了解在使用窗口函数并看到一些意外行为时如何构造框架。这是代码(可以按原样运行):

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, FloatType, StringType, IntegerType

spark = SparkSession.builder.getOrCreate()


schema = StructType([StructField('product', StringType(), True),
                     StructField('category', StringType(), True),
                     StructField('revenue', IntegerType(), True)])

spark.createDataFrame(

    [
        ("Thin", "Cell Phone", 6000),
        ("Normal", "Tablet", 1500),
        ("Mini", "Tablet", 5500),
        ("Ultra thin", "Cell Phone", 5000),
        ("Very thin", "Cell Phone", 6000),
        ("Big", "Tablet", 2500),
        ("Bendable", "Cell Phone", 3000),
        ("Foldable", "Cell Phone", 3000),
        ("Pro", "Tablet", 4500),
        ("Pro2", "Tablet", 6500),
     ] ,schema=schema ).createOrReplaceTempView("revenues")

# using collect list to print the contents of the current frame
# using min to show the unexpected behavior
spark.sql("""
    select product,
           category,
           revenue,
           collect_list((product, revenue)) over (partition by category order by revenue desc) frame,
           min(revenue) over (partition by category order by revenue desc) as min
    from revenues
""").show(truncate=False)

这是输出:

+----------+----------+-------+-----------------------------------------------------------------------------------------+----+
|product   |category  |revenue|frame                                                                                    |min |
+----------+----------+-------+-----------------------------------------------------------------------------------------+----+
|Pro2      |Tablet    |6500   |[[Pro2, 6500]]                                                                           |6500|
|Mini      |Tablet    |5500   |[[Pro2, 6500], [Mini, 5500]]                                                             |5500|
|Pro       |Tablet    |4500   |[[Pro2, 6500], [Mini, 5500], [Pro, 4500]]                                                |4500|
|Big       |Tablet    |2500   |[[Pro2, 6500], [Mini, 5500], [Pro, 4500], [Big, 2500]]                                   |2500|
|Normal    |Tablet    |1500   |[[Pro2, 6500], [Mini, 5500], [Pro, 4500], [Big, 2500], [Normal, 1500]]                   |1500|
|Thin      |Cell Phone|6000   |[[Thin, 6000], [Very thin, 6000]]                                                        |6000|
|Very thin |Cell Phone|6000   |[[Thin, 6000], [Very thin, 6000]]                                                        |6000|
|Ultra thin|Cell Phone|5000   |[[Thin, 6000], [Very thin, 6000], [Ultra thin, 5000]]                                    |5000|
|Bendable  |Cell Phone|3000   |[[Thin, 6000], [Very thin, 6000], [Ultra thin, 5000], [Bendable, 3000], [Foldable, 3000]]|3000|
|Foldable  |Cell Phone|3000   |[[Thin, 6000], [Very thin, 6000], [Ultra thin, 5000], [Bendable, 3000], [Foldable, 3000]]|3000|
+----------+----------+-------+-----------------------------------------------------------------------------------------+----+

现在的问题:

在这种情况下,窗口函数没有对框架指定任何限制。我希望框架在所有行中都相似(并且对应于完整的值组)。在实践中,我看到框架扩展(不确定根据什么标准)。这会导致“min”函数返回一个不正确的值——“Tablet”类别的 6500 而不是 1500。

如果我删除窗口规范中的 order by 子句,我会看到预期的行为:

collect_list((product, revenue)) over (partition by category) frame, 
min(revenue) over (partition by category) as min

产量:

+----------+----------+-------+-----------------------------------------------------------------------------------------+----+
|product   |category  |revenue|frame                                                                                    |min |
+----------+----------+-------+-----------------------------------------------------------------------------------------+----+
|Normal    |Tablet    |1500   |[[Normal, 1500], [Mini, 5500], [Big, 2500], [Pro, 4500], [Pro2, 6500]]                   |1500|
|Mini      |Tablet    |5500   |[[Normal, 1500], [Mini, 5500], [Big, 2500], [Pro, 4500], [Pro2, 6500]]                   |1500|
|Big       |Tablet    |2500   |[[Normal, 1500], [Mini, 5500], [Big, 2500], [Pro, 4500], [Pro2, 6500]]                   |1500|
|Pro       |Tablet    |4500   |[[Normal, 1500], [Mini, 5500], [Big, 2500], [Pro, 4500], [Pro2, 6500]]                   |1500|
|Pro2      |Tablet    |6500   |[[Normal, 1500], [Mini, 5500], [Big, 2500], [Pro, 4500], [Pro2, 6500]]                   |1500|
|Thin      |Cell Phone|6000   |[[Thin, 6000], [Ultra thin, 5000], [Very thin, 6000], [Bendable, 3000], [Foldable, 3000]]|3000|
|Ultra thin|Cell Phone|5000   |[[Thin, 6000], [Ultra thin, 5000], [Very thin, 6000], [Bendable, 3000], [Foldable, 3000]]|3000|
|Very thin |Cell Phone|6000   |[[Thin, 6000], [Ultra thin, 5000], [Very thin, 6000], [Bendable, 3000], [Foldable, 3000]]|3000|
|Bendable  |Cell Phone|3000   |[[Thin, 6000], [Ultra thin, 5000], [Very thin, 6000], [Bendable, 3000], [Foldable, 3000]]|3000|
|Foldable  |Cell Phone|3000   |[[Thin, 6000], [Ultra thin, 5000], [Very thin, 6000], [Bendable, 3000], [Foldable, 3000]]|3000|
+----------+----------+-------+-----------------------------------------------------------------------------------------+----+

我的假设不正确吗?如果是这样 - 在这两种情况下,框架构造的机制是什么?

在 Spark 3.0.1 上测试

标签: sqlapache-sparkpysparkwindow-functions

解决方案


这是预期的行为:文档指出:

未定义 ordering 时,默认使用无界窗口框架(rowFrame、unboundedPreceding、unboundedFollowing)。定义排序时,默认使用增长的窗口框架(rangeFrame、unboundedPreceding、currentRow)。


推荐阅读