python - 将函数传递给使用 pyspark 读取 S3 文件的 spark
问题描述
我在 s3 中有 GB 的数据,并试图通过参考以下链接在读取我的代码时带来并行性。
我使用下面的代码作为示例,但是当我运行时,它会出现以下错误:
对此的任何帮助都深表感谢,因为我对火花很陌生。
编辑:我必须使用并行性读取我的 s3 文件,这在任何帖子中都没有解释。标记重复的人请先阅读问题。
PicklingError:无法序列化对象:异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。
class telco_cn:
def __init__(self, sc):
self.sc = sc
def decode_module(msg):
df=spark.read.json(msg)
return df
def consumer_input(self, sc, k_topic):
a = sc.parallelize(['s3://bucket1/1575158401-51e09537-0ce5-c775-6beb-fd1b0a568e15.json'])
d = a.map(lambda x: telco_cn.decode_module(x)).collect()
print (d)
if __name__ == "__main__":
cn = telco_cn(sc)
cn.consumer_input(sc, '')
解决方案
您正在尝试spark.read.json
从map
RDD 上的操作中调用。由于此映射操作将在 Spark 的 executor/worker 节点上执行,因此您无法在映射中引用 SparkContext/SparkSession 变量(在 Spark 驱动程序上定义)。这就是错误消息试图告诉您的内容。
为什么不直接打电话df=spark.read.json('s3://bucket1/1575158401-51e09537-0ce5-c775-6beb-fd1b0a568e15.json')
呢?
推荐阅读
- python - 如何获取单选按钮来更新直方图 Matplotlib Python
- reactjs - 如何将ant design类组件转换为功能组件
- java - 使用 Jsoup 将数据从网页解析到 Android 应用
- python - 可以导入pytorch但不能导入torch
- angular - Angular 2方式绑定组件
- python - 使用 Selenium 和 Python 从 svg 标签中的 g 标签获取文本
- javascript - 在evennode.com的真实主机上使用mongodb的问题
- excel - 如何改进将 14 列月薪表转换为 4 列数据集的 VBA 代码
- javascript - 如何在 DeviceMotionEvent javascript 中设置间隔
- sqlite - 如何将片假名添加到 SQLite3?