python - pyspark 中的 Pandas UDF
问题描述
我正在尝试对 spark 数据框进行一系列观察。基本上我有一个日期列表,我应该为每个组创建缺少的一个。
在 pandas 中reindex
,有 pyspark 中没有的功能。
我试图实现一个熊猫 UDF:
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def reindex_by_date(df):
df = df.set_index('dates')
dates = pd.date_range(df.index.min(), df.index.max())
return df.reindex(dates, fill_value=0).ffill()
这看起来应该做我需要的,但是它失败了这个消息
AttributeError: Can only use .dt accessor with datetimelike values
。我在这里做错了什么?
这里是完整的代码:
data = spark.createDataFrame(
[(1, "2020-01-01", 0),
(1, "2020-01-03", 42),
(2, "2020-01-01", -1),
(2, "2020-01-03", -2)],
('id', 'dates', 'value'))
data = data.withColumn('dates', col('dates').cast("date"))
schema = StructType([
StructField('id', IntegerType()),
StructField('dates', DateType()),
StructField('value', DoubleType())])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def reindex_by_date(df):
df = df.set_index('dates')
dates = pd.date_range(df.index.min(), df.index.max())
return df.reindex(dates, fill_value=0).ffill()
data = data.groupby('id').apply(reindex_by_date)
理想情况下,我想要这样的东西:
+---+----------+-----+
| id| dates|value|
+---+----------+-----+
| 1|2020-01-01| 0|
| 1|2020-01-02| 0|
| 1|2020-01-03| 42|
| 2|2020-01-01| -1|
| 2|2020-01-02| 0|
| 2|2020-01-03| -2|
+---+----------+-----+
解决方案
案例 1:每个 ID 都有一个单独的日期范围。
我会尽量减少 udf 的内容。在这种情况下,我只会计算 udf 中每个 ID 的日期范围。对于其他部分,我将使用 Spark 原生函数。
from pyspark.sql import types as T
from pyspark.sql import functions as F
# Get min and max date per ID
date_ranges = data.groupby('id').agg(F.min('dates').alias('date_min'), F.max('dates').alias('date_max'))
# Calculate the date range for each ID
@F.udf(returnType=T.ArrayType(T.DateType()))
def get_date_range(date_min, date_max):
return [t.date() for t in list(pd.date_range(date_min, date_max))]
# To get one row per potential date, we need to explode the UDF output
date_ranges = date_ranges.withColumn(
'dates',
F.explode(get_date_range(F.col('date_min'), F.col('date_max')))
)
date_ranges = date_ranges.drop('date_min', 'date_max')
# Add the value for existing entries and add 0 for others
result = date_ranges.join(
data,
['id', 'dates'],
'left'
)
result = result.fillna({'value': 0})
案例 2:所有 id 具有相同的日期范围
我认为这里没有必要使用 UDF。您想要的内容可以以不同的方式存档:首先,您可以获得所有可能的 ID 和所有必要的日期。其次,你交叉加入它们,这将为你提供所有可能的组合。第三,将原始数据左连接到组合上。第四,将出现的空值替换为 0。
# Get all unique ids
ids_df = data.select('id').distinct()
# Get the date series
date_min, date_max = data.agg(F.min('dates'), F.max('dates')).collect()[0]
dates = [[t.date()] for t in list(pd.date_range(date_min, date_max))]
dates_df = spark.createDataFrame(data=dates, schema="dates:date")
# Calculate all combinations
all_comdinations = ids_df.crossJoin(dates_df)
# Add the value column
result = all_comdinations.join(
data,
['id', 'dates'],
'left'
)
# Replace all null values with 0
result = result.fillna({'value': 0})
请注意此解决方案的以下限制:
- 交叉连接可能非常昂贵。可以在这个相关问题中找到解决该问题的一种潜在解决方案。
- 收集语句和 Pandas 的使用导致了不完全并行化的 Spark 转换。
[编辑] 分为两种情况,因为我首先认为所有 ID 都具有相同的日期范围。
推荐阅读
- xml - 使用斜体的 XSLT 恢复信息 XML
- javascript - 如何提交到谷歌表单而不重定向到它的页面
- css - 如何过渡已经有动画的 React 元素
- c - 如果在 main 中添加了元素,则在 C 中查找链表的元素有效,但如果没有,则无效?
- c# - 在 C# 中指定参数属性的默认值
- ios - 将数据从 tableViewCell 传递到另一个 VC
- javascript - 如何从作为道具传递的数组中反应性地删除一个对象,以便它反映在 DOM 中?
- regex - 正则表达式查找指定的值
- c - 为什么多次调用 calloc 会使我的应用程序崩溃?
- python - Termcolor“彩色”输出奇怪的字符