首页 > 解决方案 > 如何使用 map 函数正确并行运行 pyspark 代码

问题描述

我无法在 emr 集群上将以下代码作为 pyspark 代码运行。我实际上是在尝试创建除以 2 个主键的数据组。切片是主键列的每个唯一组合,完整数据是包含整个数据作为数据框对象的数据框。该对象是使用 sqlContext 构建的。以下在并行运行时失败,抱怨 pickle 库出现序列化错误。具体来说_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

完成我想做的事情的最佳方法是什么?

def main(slices, fullData):
    jsons = slices.rdd.map(
         lambda i: handleSlices(i, fullData)).collect()  # Run in parallel
    # jsons = [handleSlices(i, fullData)
    #    for i in slices.collect()]  # run in serial
    return jsons

def handleSlices(row, fullData):
    entries = fullData.filter((col("fullData.vehicle_id") == row.vehicle_id)
                              & (col("fullData.start_time") == row.start_time)).select(
        "fullData.latitude", "fullData.longitude")

    folder = "/playback/" + row.vehicle_id + "/"
    fileName = folder + row.start_time.replace(" ", "_").replace(":", "-")

    return (fileName, entries)

标签: pysparkamazon-emr

解决方案


您的代码中有两个主要问题:

  • Spark 本身是并行的,因此您无需执行特殊操作即可使其并行运行

  • 使用时map,代码在执行程序而不是驱动程序中运行,这意味着您不能在函数中使用 DataFrame 或 RDD 引用(它们仅适用于驱动程序)。

假设 slices 和 fullData 都是 DataFrame,这个实现应该可以工作并提供一个并行数据帧,您可以稍后处理


from pyspark.sql import DataFrame
import pyspark.sql.functions as F

def main(slices: DataFrame, fullData: DataFrame) -> DataFrame:
   df = (fullData.join(slices, ["vehicle_id", "start_time"], "inner")
             .select("vehicle_id", "start_time", "latitude", "longitude")
             .withColumn("file_name", F.concat_ws("/",
                                                  F.lit("/playback"),
                                                  F.col("vehicle_id"),
                                                  F.regexp_replace(F.regexp_replace("start_time", " ", "_"), ":", "-")))
             .groupBy("file_name")
             .agg(F.collect_list(F.struct("latitude","longitude")).alias("positions"))
   )
   return df

推荐阅读