dataframe - 在过滤器和 GroupBy 计算期间在 PySpark 中出现内存错误
问题描述
这是错误: 作业因阶段故障而中止:阶段 37.0 中的任务 12 失败 4 次,最近一次失败:阶段 37.0 中丢失任务 12.3(TID 325、10.139.64.5、执行程序 20):ExecutorLostFailure(执行程序 20 退出由正在运行的任务之一)原因:远程 RPC 客户端已解除关联。可能是由于容器超过阈值或网络问题。检查驱动程序日志以获取 WARN 消息*
那么有没有其他更有效的方法来应用这些功能而不会导致内存不足错误?我有数十亿的数据要计算。
Input Dataframe on which filtering is to be done:
+------+-------+-------+------+-------+-------+
|Pos_id|level_p|skill_p|Emp_id|level_e|skill_e|
+------+-------+-------+------+-------+-------+
| 1| 2| a| 100| 2| a|
| 1| 2| a| 100| 3| f|
| 1| 2| a| 100| 2| d|
| 1| 2| a| 101| 4| a|
| 1| 2| a| 101| 5| b|
| 1| 2| a| 101| 1| e|
| 1| 2| a| 102| 5| b|
| 1| 2| a| 102| 3| d|
| 1| 2| a| 102| 2| c|
| 2| 2| d| 100| 2| a|
| 2| 2| d| 100| 3| f|
| 2| 2| d| 100| 2| d|
| 2| 2| d| 101| 4| a|
| 2| 2| d| 101| 5| b|
| 2| 2| d| 101| 1| e|
| 2| 2| d| 102| 5| b|
| 2| 2| d| 102| 3| d|
| 2| 2| d| 102| 2| c|
| 2| 4| b| 100| 2| a|
| 2| 4| b| 100| 3| f|
+------+-------+-------+------+-------+-------+
过滤代码:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql import functions as sf
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())
df_result = new_df.withColumn('result', function(sf.col('skill_p'), sf.col('skill_e')))
df_filter = df_result.filter(sf.col("result") == 1)
df_filter.show()
res = df_filter.groupBy("Pos_id", "Emp_id").agg(
sf.collect_set("skill_p").alias("SkillsMatch"),
sf.sum("result").alias("SkillsMatchedCount"))
res.show()
这需要在数十亿行上完成。
解决方案
推荐阅读
- android - Android 测试:防止异步任务在 Activity onCreate 中触发
- java - 是否可以将方法引用与接口的静态方法一起使用?
- python - 在其他 div 中的 div 之后抓取文本
- java - 在使用 PowerMockito 模拟静态方法时,我收到未完成的存根检测到异常
- r - 有序 logit 拟合中的高阶项
- python - 在键而不是索引上访问字典元素时出现键错误
- google-sheets - 是否仍然“没有 API 来以编程方式管理消费者 googlegroups.com 组”?
- python-3.x - 根据列值将值过滤给其他人
- ruby - 如何像使用 RVM 一样运行 sudo
- azure-devops - 如何在 Azure DevOps Pipelines (YML) 中为任务命名