首页 > 解决方案 > Pyspark 3.1.2 问题“预期构建 ClassDict 的参数为零”

问题描述

我已经设置了 Spark 集群版本 3.1.2。我正在为 Spark 使用 Python API。我有一些已加载到数据框中的 JSON 数据。我必须解析一个看起来像以下格式的嵌套列 (ADSZ_2)

ADSZ_2: [{key,value}, {key,value}, {key,value}]

我为此目的开发了以下代码

...
...
def parseCell(array_data):
    final_list = []
    if array_data is not None:
        for record in array_data:
            record_dict = record.asDict()
            if "string1" in record_dict:
                string1 = remover(record_dict["string1"])
                record_dict["string1"] = string1
            if "string2" in record_dict:
                string2 = remover(record_dict["string2"])
                record_dict["string2"] = string2
            final_list.append(Row(**record_dict))
        return final_list
        
        
        
df = spark.read.load(data_path, multiline="false", format="json")
udf_fun = udf(lambda row: parseCell(row), ArrayType(StructType()))
df.withColumn("new-name", udf_fun(col("ADSZ_2"))).show()
...

当我运行上面的代码时,我得到了以下异常

21/10/07 09:09:07 ERROR Executor: Exception in task 0.0 in stage 116.0 (TID 132)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/10/07 09:09:07 WARN TaskSetManager: Lost task 0.0 in stage 116.0 (TID 132) (hadoop-master.local executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)

我尝试了1中给出的各种选项,但这些解决方案都不起作用。问题出在哪里 ?

有没有更好的方法来完成这项工作?

标签: apache-sparkpysparkapache-spark-sqluser-defined-functions

解决方案


我将提出一个替代解决方案,您可以使用数据框的 rdd 转换您的行。这是我尝试采用您的数据的独立示例:

import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T


df = spark.createDataFrame([Row(ADSZ_2=[{"string1": "a", "string2": "b"}, {"string1": "c", "string2": "d"}]),
                            Row(ADSZ_2=[{"string1": "e", "string2": "f"}, {"string1": "g", "not_taken" : "1", "string2": "h"}]),
                            Row(ADSZ_2=[{"string1": "i", "string2": "j"}, {"string1": "k", "not_taken" : "1", "string2": "l"}]),
                            Row(ADSZ_2=None),
                            Row(ADSZ_2=[None, {"string1": "m", "not_taken" : "1", "string2": "n"}])])
df.show(20, False)
def parseCell(row):
    final_list = []
    l = row["ADSZ_2"]
    if l:
      for record_dict in l:
        if record_dict:
          new_dict = {key : val for key,val in record_dict.items() if key in ["string1", "string2"]}
          if new_dict:
            final_list.append(Row(**new_dict))
    return final_list
  
df_rdd = df.rdd.flatMap(lambda row: parseCell(row))
new_df = spark.createDataFrame(df_rdd)
new_df.show()

输出:

+----------------------------------------------------------------------------+
|ADSZ_2                                                                      |
+----------------------------------------------------------------------------+
|[{string1 -> a, string2 -> b}, {string1 -> c, string2 -> d}]                |
|[{string1 -> e, string2 -> f}, {not_taken -> 1, string1 -> g, string2 -> h}]|
|[{string1 -> i, string2 -> j}, {not_taken -> 1, string1 -> k, string2 -> l}]|
|null                                                                        |
|[null, {not_taken -> 1, string1 -> m, string2 -> n}]                        |
+----------------------------------------------------------------------------+

+-------+-------+
|string1|string2|
+-------+-------+
|      a|      b|
|      c|      d|
|      e|      f|
|      g|      h|
|      i|      j|
|      k|      l|
|      m|      n|
+-------+-------+

您需要确保生成的所有行都parseCell包含正确的列数。


推荐阅读