multithreading - 多线程 PySpark,无法序列化对象异常
问题描述
_pickle.PicklingError:无法序列化对象:异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。
超级简单的示例应用程序,可尝试并行运行一些计算。有效(有时),但大多数时候会因上述异常而崩溃。
我认为我没有嵌套 RDD,但是关于无法在工作人员中使用 sparkContext 的部分令人担忧,因为我认为我需要它来实现某种程度的并行性。如果我不能在工作线程中使用 sparkContext,我该如何取回计算结果?
在这一点上,我仍然希望它被序列化,并且在此之后将启用并行运行。但甚至无法让序列化的多线程版本运行....
from pyspark import SparkContext
import threading
THREADED = True. # Set this to false and it always works but is sequential
content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")
content = sc.textFile(content_file).cache() # For the non-threaded version
class Worker(threading.Thread):
def __init__(self, letter, *args, **kwargs):
super().__init__(*args, **kwargs)
self.letter = letter
def run(self):
print(f"Starting: {self.letter}")
nums[self.letter] = content.filter(lambda s: self.letter in s).count() # SPOILER self.letter turns out to be the problem
print(f"{self.letter}: {nums[self.letter]}")
nums = {}
if THREADED:
threads = []
for char in range(ord('a'), ord('z')+1):
letter = chr(char)
threads.append(Worker(letter, name=letter))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
for char in range(ord('a'), ord('z')+1):
letter = chr(char)
nums[letter] = content.filter(lambda s: letter in s).count()
print(f"{letter}: {nums[letter]}")
print(nums)
即使我将代码更改为一次使用一个线程
threads = []
for char in range(ord('a'), ord('z')+1):
letter = chr(char)
thread = Worker(letter, name=letter)
threads.append(thread)
thread.start()
thread.join()
它引发了同样的异常,我猜是因为它试图将结果返回到工作线程而不是主线程(声明 SparkContext 的地方)。
如果 spark 在这里提供任何好处,我需要能够同时等待多个值。
我试图解决的真正问题是这样的:
__________RESULT_________
^ ^ ^
A B C
a1 ^ a2 b1 ^ b2 c1 ^ c2...
为了得到我的结果,我想并行计算 AB 和 C,并且这些部分中的每一个都必须并行计算 a1、a2、a3……。我将它分解为线程,以便我可以同时请求多个值,以便 spark 可以并行运行计算。
我创建上面的示例只是因为我想让线程正确,我不想弄清楚如何计算其中包含字符的行数。但这似乎对线程方面的审查非常简单。
这个小小的改变可以解决问题。self.letter 在 lambda 中爆炸,在过滤器调用消除崩溃之前取消引用它
def run(self):
print(f"Starting: {self.letter}")
letter = self.letter
nums[self.letter] = content.filter(lambda s: letter in s).count()
print(f"{self.letter}: {nums[self.letter]}")
解决方案
您正在尝试以不打算使用的方式使用 (Py)Spark。您正在将纯 python 数据处理与火花处理混合在一起,您可以完全使用火花。
Spark(和其他数据处理框架)的理念是,您定义应该如何处理您的数据,所有多线程+分发的东西只是一个独立的“配置”。
另外,我真的不明白你想通过使用多个线程来获得什么。每个线程都会:
- 必须从输入文件中读取每个字符
- 检查当前行是否包含分配给该线程的字母
- 数数
这将(如果有效)肯定会产生正确的结果,但效率低下,因为会有许多线程争夺该文件上的那些读取操作(请记住,每个线程都必须首先读取 COMPLETE 文件,能够根据其分配的字母进行过滤)。
与火花一起工作,而不是反对它,以充分利用它。
# imports and so on
content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")
rdd = sc.textFile(content_file) # read from this file
rdd = rdd.flatMap(lambda line: [letter for letter in line]) # forward every letter of each line to the next operator
# initialize the letterRange "outside" of spark so we reduce the runtime-overhead
relevantLetterRange = [chr(char) for char in range(ord('a'), ord('z') + 1)]
rdd = rdd.filter(lambda letter: letter in relevantLetterRange)
rdd = rdd.keyBy(lambda letter: letter) # key by the letter itself
countsByKey = rdd.countByKey() # count by key
您当然可以简单地将其写在一个链中:
# imports and so on
content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")
relevantLetterRange = [chr(char) for char in range(ord('a'), ord('z') + 1)]
countsByKey = sc.textFile(content_file)\
.flatMap(lambda line: [letter for letter in line])\
.filter(lambda letter: letter in relevantLetterRange)\
.keyBy(lambda letter: letter)
.countByKey()
推荐阅读
- forms - 命令参数始终返回 null - Xamarin Forms
- wordpress - Wordpress:根据 ACF 日期在存档中设置自定义字段
- c# - 更改货币符号位置
- javascript - Mongoose - 更新文档的地图
- html - CSS Flexbox - 移动视图上的响应问题
- java - Java如何在后台播放音乐
- python - 如何从烧瓶模板将 json 文件内容作为 application/json 发布?
- html - 检查输入时更改样式 (CSS)
- c++ - 当关联对话框没有出现时,如何将文件类型与 Code::blocks 关联?
- macos - 每当键入字符时,Visual Studio 代码都会自动滚动