首页 > 解决方案 > 在过滤器和 GroupBy 计算期间在 PySpark 中出现内存错误

问题描述

这是错误: 作业因阶段故障而中止:阶段 37.0 中的任务 12 失败 4 次,最近一次失败:阶段 37.0 中丢失任务 12.3(TID 325、10.139.64.5、执行程序 20):ExecutorLostFailure(执行程序 20 退出由正在运行的任务之一)原因:远程 RPC 客户端已解除关联。可能是由于容器超过阈值或网络问题。检查驱动程序日志以获取 WARN 消息*

那么有没有其他更有效的方法来应用这些功能而不会导致内存不足错误?我有数十亿的数据要计算。

Input Dataframe on which filtering is to be done: 
+------+-------+-------+------+-------+-------+
|Pos_id|level_p|skill_p|Emp_id|level_e|skill_e|
+------+-------+-------+------+-------+-------+
|     1|      2|      a|   100|      2|      a|
|     1|      2|      a|   100|      3|      f|
|     1|      2|      a|   100|      2|      d|
|     1|      2|      a|   101|      4|      a|
|     1|      2|      a|   101|      5|      b|
|     1|      2|      a|   101|      1|      e|
|     1|      2|      a|   102|      5|      b|
|     1|      2|      a|   102|      3|      d|
|     1|      2|      a|   102|      2|      c|
|     2|      2|      d|   100|      2|      a|
|     2|      2|      d|   100|      3|      f|
|     2|      2|      d|   100|      2|      d|
|     2|      2|      d|   101|      4|      a|
|     2|      2|      d|   101|      5|      b|
|     2|      2|      d|   101|      1|      e|
|     2|      2|      d|   102|      5|      b|
|     2|      2|      d|   102|      3|      d|
|     2|      2|      d|   102|      2|      c|
|     2|      4|      b|   100|      2|      a|
|     2|      4|      b|   100|      3|      f|
+------+-------+-------+------+-------+-------+

过滤代码:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql import functions as sf
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())
df_result = new_df.withColumn('result', function(sf.col('skill_p'), sf.col('skill_e')))

df_filter = df_result.filter(sf.col("result") == 1)
df_filter.show()

res = df_filter.groupBy("Pos_id", "Emp_id").agg(
      sf.collect_set("skill_p").alias("SkillsMatch"),
      sf.sum("result").alias("SkillsMatchedCount"))
res.show()

这需要在数十亿行上完成。

标签: dataframefiltergroup-bypysparkout-of-memory

解决方案


推荐阅读