python - 从 Python 列表中向 PySpark DataFrame 添加新列
问题描述
我有一个清单:
dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]
我尝试将其添加到的数据框长度相同(没有问题)。
我试过了:
df = df.withColumn("YEARS", dates)
Error: Column needs to be col
我也试过:
df = df.withColumn("YEARS", f.lit(dates))
但这也行不通。
我看到了这个问题:如何在 Spark DataFrame 中添加常量列?
但是对于这种情况没有任何用处。
更新:预期的结果是:
df_columns... | dates_from_list
---------------------------------
original_df_data| 2017
original_df_data| 2018
original_df_data| 2018
original_df_data| 2018
original_df_data| 2019
original_df_data| 2019
original_df_data| 2019
original_df_data| 2020
original_df_data| 2020
original_df_data| 2020
解决方案
您的错误来自您需要传递给对象的withColumn
事实Column
。
这里有两种方法可以将日期添加为 Spark 上的新列DataFrame
(使用每个记录的顺序进行连接),具体取决于日期数据的大小。
1)如果你操作一个小数据集
实现它的一种简洁方法是将 UDF 应用于单调递增的 id:
from pyspark.sql.functions import udf, monotonically_increasing_id
df = [...] # 10 records
dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]
df = df.repartition(1).withColumn(
"YEARS",
udf(lambda id: dates[id])(monotonically_increasing_id()))
df.show()
输出:
+---+-----+
|...|YEARS|
+---+-----+
|...| 2017|
|...| 2018|
|...| 2018|
|...| 2018|
|...| 2019|
|...| 2019|
|...| 2019|
|...| 2020|
|...| 2020|
|...| 2020|
+---+-----+
注意:确保.repartition(1)
生成的 id 是连续的。dates
如果您有另一种方法将每条记录映射到其中的值(如先前构建的 id 列),则可以避免重新分区到单个分区。在这个用例中,正如我们预期的那样 Python 列表对象非常小,这意味着您的 DataFrame 也非常小,因此重新分区并不是什么大问题。
/!\如果数据框和 python 列表太大,为什么它不会缩放:
- 需要对数据帧进行重新分区,从而导致昂贵的洗牌/交换
- 这
.repartition(1)
可能会导致生成一个非常大的分区,处理起来可能非常缓慢(因为它很大,而且如果它不适合执行内存,它可能意味着许多额外的磁盘 I/O 会将 RDD 块溢出到磁盘),或使用OutOfMemoryError
. - python 列表由 udf(通过 lambda 闭包)捕获,这意味着它将被广播到集群的每个执行程序
2) 如果您操作的数据集大小 > 百万行
这是另一种方法,通过使用 pandas 操作 ids 和 dates 列并避免对 Spark 进行任何重新分区,可以更好地处理数百万行DataFrame
。
可以这样做:
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
# some spark DataFrame of length N
df = [...]
# generate monotically increasing ids (not consecutive) without repartitioning the Spark DataFrame.
df = df.withColumn("id", monotonically_increasing_id())
# get generated ids (not consecutive) as a mono-column pandas DataFrame
spark_df_ids = df.select("id").toPandas()
# some python list of length N
dates = [2017, 2018, 2018, 2018, 2019, ..., 2019, 2019, 2020, 2020, 2020]
# build pandas DataFrame from dates
dates_pandas_df = pd.DataFrame(dates, columns=["YEARS"])
# append the id column to the dates in pandas
dates_and_ids_pandas_df = dates_pandas_df.join(spark_df_ids)
# convert from pandas DataFrame to spark DataFrame
dates_and_ids_spark_df = spark.createDataFrame(dates_and_ids_pandas_df)
# Perform the final adding of the dates column to the Spark DataFrame with a join in Spark
df.join(dates_and_ids_spark_df, ["id"]).show()
重要提示:使用 Apache Arrow可以更快地与 pandas 进行转换
推荐阅读
- amazon-web-services - 使用 golang 生成 Cognito Web 令牌
- c - 如何正确清除输入缓冲区(第一个项目)#rewritten 代码
- ubuntu - 无法打开模块文件“hdf5.mod”以在 (1) 处读取:没有这样的文件或目录
- opencv - 使用python和openCV将pdf页面与模式匹配
- javascript - 如何使用 React 和 Sharetribe flex 中的类组件关闭模式
- jhipster - 有 JHipster 针的任何文档吗?
- html - 带有 ngx-printer 的电子应用程序无法打印
- java - 如何在数组列表中添加对象的属性?
- postgresql - 如何在 PostgreSQL 上的数据库上输入密码?
- caching - 如何使用当前月份作为密钥创建 Gitlab CI 管道缓存