首页 > 解决方案 > 如何使单元测试模拟代码对每个 Spark 工作节点(进程)产生影响

问题描述

我有三个文件:

  1. spark_mock_dependency.py提供了user()读取 /etc/user 的方法,
  2. spark_mock.py用于创建一个 Env 类,它使用user()方法来获取谁是用户。
  3. spark_mock_test.py用于 Env 类的单元测试。

我的环境没有/etc/user,所以我需要模拟user()伪造它的方法。但是,单元测试test_env_without_spark有效但test_env_with_spark无效。看起来模拟只适用于驱动程序节点,我无法在所有工作节点(进程)上模拟一个类或方法。请参阅我的代码和下面的错误。

有没有人知道如何在所有工作节点(进程)上模拟方法?

spark_mock_dependency.py

def user():
    with open('/etc/user') as f:
        return f.readline().strip()

spark_mock.py

from pkgname.spark_mock_dependency import user


class Env:
    user = user()

spark_mock_test.py

import unittest
from unittest.mock import patch
from pyspark import SparkConf, SparkContext


class EnvTest(unittest.TestCase):
    sc = None

    @classmethod
    def setUpClass(cls) -> None:
        conf = SparkConf().setMaster("local[2]").setAppName("testing")
        cls.sc = SparkContext(conf=conf)

    @patch('pkgname.spark_mock_dependency.user')
    def test_env_with_spark(self, user_mocker):
        user_mocker.return_value = 'anyone'

        from pkgname.spark_mock import Env
        rdd = self.__class__.sc.parallelize([1, 2])
        results = rdd.map(lambda: f'{Env.user}').collect()
        self.assertTrue([res == 'anyone' for res in results])

    @patch('pkgname.spark_mock_dependency.hb_user')
    def test_env_without_spark(self, user_mocker):
        user_mocker.return_value = 'anyone'
        from pkgname.spark_mock import Env
        self.assertEqual('anyone', Env.user)

错误信息

py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 4, in <module>
    class Env:
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 5, in Env
    user = user()
"lib/python3.6/site-packages/pkgname/spark_mock_dependency.py", line 2, in hb_user
    with open('/etc/user') as f:
FileNotFoundError: [Errno 2] No such file or directory: '/etc/user'

标签: pythonunit-testingapache-sparkpysparkmocking

解决方案


您可以在单元测试中导入用户包并模拟该包本身

检查以下代码:

def test_env_with_spark(self, user_mocker):

    from pkgname.spark_mock import Env, user
    user_mocker.patch("pkgname.spark_mock.user", return_value='anyone')
    rdd = self.__class__.sc.parallelize([1, 2])
    results = rdd.map(lambda: f'{Env.user}').collect()

推荐阅读