python - 如何使单元测试模拟代码对每个 Spark 工作节点(进程)产生影响
问题描述
我有三个文件:
- spark_mock_dependency.py提供了
user()
读取 /etc/user 的方法, - spark_mock.py用于创建一个 Env 类,它使用
user()
方法来获取谁是用户。 - spark_mock_test.py用于 Env 类的单元测试。
我的环境没有/etc/user
,所以我需要模拟user()
伪造它的方法。但是,单元测试test_env_without_spark
有效但test_env_with_spark
无效。看起来模拟只适用于驱动程序节点,我无法在所有工作节点(进程)上模拟一个类或方法。请参阅我的代码和下面的错误。
有没有人知道如何在所有工作节点(进程)上模拟方法?
spark_mock_dependency.py
def user():
with open('/etc/user') as f:
return f.readline().strip()
spark_mock.py
from pkgname.spark_mock_dependency import user
class Env:
user = user()
spark_mock_test.py
import unittest
from unittest.mock import patch
from pyspark import SparkConf, SparkContext
class EnvTest(unittest.TestCase):
sc = None
@classmethod
def setUpClass(cls) -> None:
conf = SparkConf().setMaster("local[2]").setAppName("testing")
cls.sc = SparkContext(conf=conf)
@patch('pkgname.spark_mock_dependency.user')
def test_env_with_spark(self, user_mocker):
user_mocker.return_value = 'anyone'
from pkgname.spark_mock import Env
rdd = self.__class__.sc.parallelize([1, 2])
results = rdd.map(lambda: f'{Env.user}').collect()
self.assertTrue([res == 'anyone' for res in results])
@patch('pkgname.spark_mock_dependency.hb_user')
def test_env_without_spark(self, user_mocker):
user_mocker.return_value = 'anyone'
from pkgname.spark_mock import Env
self.assertEqual('anyone', Env.user)
错误信息
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 4, in <module>
class Env:
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 5, in Env
user = user()
"lib/python3.6/site-packages/pkgname/spark_mock_dependency.py", line 2, in hb_user
with open('/etc/user') as f:
FileNotFoundError: [Errno 2] No such file or directory: '/etc/user'
解决方案
您可以在单元测试中导入用户包并模拟该包本身
检查以下代码:
def test_env_with_spark(self, user_mocker):
from pkgname.spark_mock import Env, user
user_mocker.patch("pkgname.spark_mock.user", return_value='anyone')
rdd = self.__class__.sc.parallelize([1, 2])
results = rdd.map(lambda: f'{Env.user}').collect()
推荐阅读
- functional-programming - 是否有一种语言可以同时执行 SQL 和通用编程?
- android - Google Play 管理中心抱怨标题和简短描述
- docker - 从单个 docker 容器中的多个进程记录日志
- python-3.x - 使用 For 循环迭代时从列表中删除项目
- github - Jekyll Github Actions 管道卡在 jekyll build
- r - 热图生成R中的cut.default错误
- c++ - Visual Studio C++ std::istream::read 文件打开失败
- c - *pe=pn->Entry; 的作用是什么?
- c++ - 字符串解析器函数未正确写入元组
- django - Django 身份验证权限