首页 > 解决方案 > 多线程 PySpark,无法序列化对象异常

问题描述

_pickle.PicklingError:无法序列化对象:异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。

超级简单的示例应用程序,可尝试并行运行一些计算。有效(有时),但大多数时候会因上述异常而崩溃。

我认为我没有嵌套 RDD,但是关于无法在工作人员中使用 sparkContext 的部分令人担忧,因为我认为我需要它来实现某种程度的并行性。如果我不能在工作线程中使用 sparkContext,我该如何取回计算结果?

在这一点上,我仍然希望它被序列化,并且在此之后将启用并行运行。但甚至无法让序列化的多线程版本运行....

from pyspark import SparkContext
import threading

THREADED = True. # Set this to false and it always works but is sequential

content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")
content = sc.textFile(content_file).cache() # For the non-threaded version

class Worker(threading.Thread):
    def __init__(self, letter, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.letter = letter

    def run(self):
        print(f"Starting: {self.letter}")
        nums[self.letter] = content.filter(lambda s: self.letter in s).count() # SPOILER self.letter turns out to be the problem
        print(f"{self.letter}: {nums[self.letter]}")

nums = {}
if THREADED:
    threads = []
    for char in range(ord('a'), ord('z')+1):
        letter = chr(char)
        threads.append(Worker(letter, name=letter))

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

else:
    for char in range(ord('a'), ord('z')+1):
        letter = chr(char)
        nums[letter] = content.filter(lambda s: letter in s).count()
        print(f"{letter}: {nums[letter]}")

print(nums)

即使我将代码更改为一次使用一个线程

    threads = []
    for char in range(ord('a'), ord('z')+1):
        letter = chr(char)
        thread = Worker(letter, name=letter)
        threads.append(thread)
        thread.start()
        thread.join()

它引发了同样的异常,我猜是因为它试图将结果返回到工作线程而不是主线程(声明 SparkContext 的地方)。

如果 spark 在这里提供任何好处,我需要能够同时等待多个值。


我试图解决的真正问题是这样的:

__________RESULT_________
   ^         ^         ^
   A         B         C
a1 ^ a2   b1 ^ b2   c1 ^ c2...

为了得到我的结果,我想并行计算 AB 和 C,并且这些部分中的每一个都必须并行计算 a1、a2、a3……。我将它分解为线程,以便我可以同时请求多个值,以便 spark 可以并行运行计算。


我创建上面的示例只是因为我想让线程正确,我不想弄清楚如何计算其中包含字符的行数。但这似乎对线程方面的审查非常简单。


这个小小的改变可以解决问题。self.letter 在 lambda 中爆炸,在过滤器调用消除崩溃之前取消引用它

    def run(self):
        print(f"Starting: {self.letter}")
        letter = self.letter
        nums[self.letter] = content.filter(lambda s: letter in s).count()
        print(f"{self.letter}: {nums[self.letter]}")

标签: multithreadingapache-sparkpyspark

解决方案


您正在尝试以不打算使用的方式使用 (Py)Spark。您正在将纯 python 数据处理与火花处理混合在一起,您可以完全使用火花。

Spark(和其他数据处理框架)的理念是,您定义应该如何处理您的数据,所有多线程+分发的东西只是一个独立的“配置”。

另外,我真的不明白你想通过使用多个线程来获得什么。每个线程都会:

  1. 必须从输入文件中读取每个字符
  2. 检查当前行是否包含分配给该线程的字母
  3. 数数

这将(如果有效)肯定会产生正确的结果,但效率低下,因为会有许多线程争夺该文件上的那些读取操作(请记住,每个线程都必须首先读取 COMPLETE 文件,能够根据其分配的字母进行过滤)。

与火花一起工作,而不是反对它,以充分利用它。

# imports and so on

content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")

rdd = sc.textFile(content_file) # read from this file
rdd = rdd.flatMap(lambda line: [letter for letter in line]) # forward every letter of each line to the next operator

# initialize the letterRange "outside" of spark so we reduce the runtime-overhead
relevantLetterRange = [chr(char) for char in range(ord('a'), ord('z') + 1)]
rdd = rdd.filter(lambda letter: letter in relevantLetterRange)

rdd = rdd.keyBy(lambda letter: letter) # key by the letter itself
countsByKey = rdd.countByKey() # count by key

您当然可以简单地将其写在一个链中:

# imports and so on

content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")

relevantLetterRange = [chr(char) for char in range(ord('a'), ord('z') + 1)]

countsByKey = sc.textFile(content_file)\
    .flatMap(lambda line: [letter for letter in line])\
    .filter(lambda letter: letter in relevantLetterRange)\
    .keyBy(lambda letter: letter)
    .countByKey()

推荐阅读