首页 > 解决方案 > 将函数传递给使用 pyspark 读取 S3 文件的 spark

问题描述

我在 s3 中有 GB 的数据,并试图通过参考以下链接在读取我的代码时带来并行性。

我使用下面的代码作为示例,但是当我运行时,它会出现以下错误:

对此的任何帮助都深表感谢,因为我对火花很陌生。

编辑:我必须使用并行性读取我的 s3 文件,这在任何帖子中都没有解释。标记重复的人请先阅读问题。

PicklingError:无法序列化对象:异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。

class telco_cn:
    def __init__(self, sc):
        self.sc = sc

    def decode_module(msg):
        df=spark.read.json(msg)
        return df

    def consumer_input(self, sc, k_topic):
        a = sc.parallelize(['s3://bucket1/1575158401-51e09537-0ce5-c775-6beb-fd1b0a568e15.json'])
        d = a.map(lambda x: telco_cn.decode_module(x)).collect()
        print (d)
if __name__ == "__main__":
    cn = telco_cn(sc)
    cn.consumer_input(sc, '')

标签: pythonapache-sparkpysparkbigdata

解决方案


您正在尝试spark.read.jsonmapRDD 上的操作中调用。由于此映射操作将在 Spark 的 executor/worker 节点上执行,因此您无法在映射中引用 SparkContext/SparkSession 变量(在 Spark 驱动程序上定义)。这就是错误消息试图告诉您的内容。

为什么不直接打电话df=spark.read.json('s3://bucket1/1575158401-51e09537-0ce5-c775-6beb-fd1b0a568e15.json')呢?


推荐阅读