python - pyspark 内存高效循环将指标列添加到数据帧
问题描述
有没有办法将以下内容转换为利用循环中的 pyspark 并行化的代码for
?
import pyspark.sql.functions as F
my_list_of_integers = list(df_column_of_integers.select('column_name').toPandas()['column_name'])
for my_int in my_list_of_integers:
temp_df = large_df1.filter(large_df1.a_value_column == my_int)
temp_df = temp_df.select("a_key_column")
temp_df = temp_df.withColumn("indicator" + str(my_int), F.lit(1))
large_df2 = large_df2.join(temp_df, on="a_key_column", how="left")
在for
循环 7 次后(目标是 185),代码失败并给出以下错误消息:
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 52 bytes of memory, got 0
我正在使用的系统报告的另一条错误消息建议如何解决该问题:
您的工作超出了内存开销。您的代码可能试图完全在一个执行程序上运行,如果您没有使用 pySpark,就会发生这种情况;例如,如果您使用的是 Pandas 或 UDF。
对于一个简单的工作示例,这里是示例输入和此示例输入的预期输出的可视化:
样本输入:
df_column_of_integers = spark.createDataFrame([10, 11, 13], IntegerType()).toDF('column_name')
df1_key_column = [52, 52, 53, 53, 52, 52]
a_value_column = [9, 13, 10, 11, 12, 10]
large_df1 = sqlContext.createDataFrame(zip(df1_key_column, a_value_column), schema=['a_key_column', 'a_value_column'])
large_df2 = spark.createDataFrame([52, 54, 53], IntegerType()).toDF('a_key_column')
预期输出(即上面简单示例的 large_df2 的最终版本):
+--------------+-------------+-------------+-------------+
| a_key_column| indicator10| indicator11| indicator13|
+--------------+-------------+-------------+-------------+
| 52| 1| NULL| 1|
| 54| NULL| NULL| NULL|
| 53| 1| 1| NULL|
+--------------+-------------+-------------+-------------+
实际上,我的 df_column_of_integers 有 185 个条目。large_df1 在循环的第一步被过滤之前有 8200 万行和 2 列,for
在该过滤器之后最多有 90 万行。large_df2 以 90 万行和 33 列开始(其中 23 个是整数)。从详细的错误消息来看,似乎是在加入期间发生了错误。然而,我过去在这个系统上加入了更大的数据集,只是没有在for
Pandas 列表中循环,所以它让我认为问题的根源是使用 Pandas 列表,它提示使用单个执行器。因此,我认为可能会有更好的循环技术,有人可能知道。
我尝试将 .foreach 与 lambda 函数一起使用,如下所述:https://sparkbyexamples.com/pyspark/pyspark-loop-iterate-through-rows-in-dataframe/,但我不知道如何添加 large_df1 和 large_df2作为 lambda 函数的附加输入。而且我认为 .map 不会有帮助,因为我不想编辑 my_list_of_integers,只想对其值进行交互。
先感谢您!
解决方案
我解决了我的问题:我将for
循环中的所有内容替换为:
import pyspark.sql.functions as F
large_df1 = large_df1.filter(large_df1.a_value_column.isin(my_list_of_integers))
large_df1 = large_df1.withColumn("my_value". F.lit(1))
large_df1 = large_df1.groupBy("a_key_column").pivot("a_value_column", my_list_of_integers).agg(F.first(F.col("my_value")))
large_df2 = large_df2.join(large_df1, on="a_key_column", how="left")
关键步骤是使用pivot
. 新代码运行方便快捷。
推荐阅读
- sql - 如何使用对 Excel 的查询来获取两个表之间的所有差异
- angular - 在组件初始化时执行 [kendoGridAddCommand]
- android - 从运行 Android 的解码 PNG 图像中获取多线程安全 RGBA 值
- objective-c - 在 UILabel 中显示特殊字符
- python - 使用日期时间数据绘制图形时出现问题
- java - 如何使用junit测试基于kafka的父子项目
- laravel-5 - 在 laravel 5.6 中创建子查询
- android - Apktool/aapt/aapt2 解析 xml 文件中的 unicode 字符时出错
- javascript - 反应 konva 放大和缩小指针位置上的鼠标滚动无法正常工作
- log4cxx - 如何用柯南安装 log4cxx?