pyspark - 如何使用 map 函数正确并行运行 pyspark 代码
问题描述
我无法在 emr 集群上将以下代码作为 pyspark 代码运行。我实际上是在尝试创建除以 2 个主键的数据组。切片是主键列的每个唯一组合,完整数据是包含整个数据作为数据框对象的数据框。该对象是使用 sqlContext 构建的。以下在并行运行时失败,抱怨 pickle 库出现序列化错误。具体来说_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
完成我想做的事情的最佳方法是什么?
def main(slices, fullData):
jsons = slices.rdd.map(
lambda i: handleSlices(i, fullData)).collect() # Run in parallel
# jsons = [handleSlices(i, fullData)
# for i in slices.collect()] # run in serial
return jsons
def handleSlices(row, fullData):
entries = fullData.filter((col("fullData.vehicle_id") == row.vehicle_id)
& (col("fullData.start_time") == row.start_time)).select(
"fullData.latitude", "fullData.longitude")
folder = "/playback/" + row.vehicle_id + "/"
fileName = folder + row.start_time.replace(" ", "_").replace(":", "-")
return (fileName, entries)
解决方案
您的代码中有两个主要问题:
Spark 本身是并行的,因此您无需执行特殊操作即可使其并行运行
使用时
map
,代码在执行程序而不是驱动程序中运行,这意味着您不能在函数中使用 DataFrame 或 RDD 引用(它们仅适用于驱动程序)。
假设 slices 和 fullData 都是 DataFrame,这个实现应该可以工作并提供一个并行数据帧,您可以稍后处理
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
def main(slices: DataFrame, fullData: DataFrame) -> DataFrame:
df = (fullData.join(slices, ["vehicle_id", "start_time"], "inner")
.select("vehicle_id", "start_time", "latitude", "longitude")
.withColumn("file_name", F.concat_ws("/",
F.lit("/playback"),
F.col("vehicle_id"),
F.regexp_replace(F.regexp_replace("start_time", " ", "_"), ":", "-")))
.groupBy("file_name")
.agg(F.collect_list(F.struct("latitude","longitude")).alias("positions"))
)
return df
推荐阅读
- java - java:扩展一个内部类
- swift - 将动作作为 () -> Void 传递给组件是个好主意吗?
- c# - 在 Blazor 组件中显示条件内容的最佳方式是什么
- ansible - Cisco Nexus 的 Ansible 手册因提供商中的 nxapi 而失败
- python - Kivy 返回 AttributeError: object has no attribute 'name' when it clear has it
- java - 警告:RPC 失败:尝试在 java 和 Nodejs 中使用 GRPC 时,状态{code=NOT_FOUND,description=Not found,cause=null}
- go - 如何使用 Apache Thrift 定义类型 mgo objectId?
- powershell - Powershell - Start-Job - 传递基于变量的命令
- node.js - 如何让 Nodejs 在 CORS 中使用 k8 DNS 命名
- ruby-on-rails - 在 Rails 的顶级路径和嵌套路径中使用具有关联的相同控制器