首页 > 解决方案 > 可以使用火花窗函数 first_value 吗?

问题描述

如下面的代码片段所示,窗口函数 first_value(等价于 first)似乎存在。该功能未在

https://spark.apache.org/docs/3.1.2/sql-ref-functions-builtin.html#window-functions

但是它列在

https://spark.apache.org/docs/latest/api/sql/#first_value

在任何情况下,它似乎都可以用作窗口功能

# first_value window function
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *
import pandas as pd
from time import perf_counter

# get a spark session
spark = SparkSession.builder.appName('learn').getOrCreate()

# create dataset
schema = StructType([
    StructField('c1', StringType(), nullable=True),
    StructField('c2', StringType(), nullable=True),
    StructField('value', DoubleType(), nullable=True),
])
import random
data = [(random.choice(list('ABC')), random.choice(list('abc')), random.random()) for _ in range(100)]
df = spark.createDataFrame(data, schema=schema).drop_duplicates()
df.createOrReplaceTempView('tmp_view')

# execute window function (using first() instead of first_value() gives the same result)
query ="""SELECT c1, first_value(value) OVER (PARTITION BY c1) as f FROM tmp_view"""
res = spark.sql(query)
res.drop_duplicates().show()

所以问题是在文档的窗口函数表中省略列出第一个值/第一个窗口函数的问题吗?

查看数据框 API,似乎 first_value 不存在,而 first 不是窗口函数而是聚合函数

import pyspark.sql.functions as f
f.first?
Signature: f.first(col, ignorenulls=False)
Docstring:
Aggregate function: returns the first value in a group.
The function by default returns the first values it sees. It will return the first non-null
value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
.. versionadded:: 1.3.0

但是,它可以使用 dataframe API 作为窗口函数执行:

from pyspark.sql.window import Window
w = Window.partitionBy("c1")
res = df.withColumn('f', f.first('value').over(w))
res.select(['c1', 'f']).drop_duplicates()
res.show()

它也可以用作聚合函数

data = [('a', 3),
        ('a', 30),
        ('b', 7),
        ('b', 70)
        ]
df = spark.createDataFrame(data, ['nam', 'value'])
res = df.groupby('nam').agg(f.first(f.col('value')))
res.show()

到底是怎么回事?是文档令人困惑还是我的理解有误?

标签: apache-spark-sql

解决方案


检查这个官方api和它的例子,你可以从数据框列定义一个窗口列xxx.over(w)


推荐阅读