apache-spark - Pyspark 3.1.2 问题“预期构建 ClassDict 的参数为零”
问题描述
我已经设置了 Spark 集群版本 3.1.2。我正在为 Spark 使用 Python API。我有一些已加载到数据框中的 JSON 数据。我必须解析一个看起来像以下格式的嵌套列 (ADSZ_2)
ADSZ_2: [{key,value}, {key,value}, {key,value}]
我为此目的开发了以下代码
...
...
def parseCell(array_data):
final_list = []
if array_data is not None:
for record in array_data:
record_dict = record.asDict()
if "string1" in record_dict:
string1 = remover(record_dict["string1"])
record_dict["string1"] = string1
if "string2" in record_dict:
string2 = remover(record_dict["string2"])
record_dict["string2"] = string2
final_list.append(Row(**record_dict))
return final_list
df = spark.read.load(data_path, multiline="false", format="json")
udf_fun = udf(lambda row: parseCell(row), ArrayType(StructType()))
df.withColumn("new-name", udf_fun(col("ADSZ_2"))).show()
...
当我运行上面的代码时,我得到了以下异常
21/10/07 09:09:07 ERROR Executor: Exception in task 0.0 in stage 116.0 (TID 132)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/10/07 09:09:07 WARN TaskSetManager: Lost task 0.0 in stage 116.0 (TID 132) (hadoop-master.local executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)
我尝试了1中给出的各种选项,但这些解决方案都不起作用。问题出在哪里 ?
有没有更好的方法来完成这项工作?
解决方案
我将提出一个替代解决方案,您可以使用数据框的 rdd 转换您的行。这是我尝试采用您的数据的独立示例:
import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T
df = spark.createDataFrame([Row(ADSZ_2=[{"string1": "a", "string2": "b"}, {"string1": "c", "string2": "d"}]),
Row(ADSZ_2=[{"string1": "e", "string2": "f"}, {"string1": "g", "not_taken" : "1", "string2": "h"}]),
Row(ADSZ_2=[{"string1": "i", "string2": "j"}, {"string1": "k", "not_taken" : "1", "string2": "l"}]),
Row(ADSZ_2=None),
Row(ADSZ_2=[None, {"string1": "m", "not_taken" : "1", "string2": "n"}])])
df.show(20, False)
def parseCell(row):
final_list = []
l = row["ADSZ_2"]
if l:
for record_dict in l:
if record_dict:
new_dict = {key : val for key,val in record_dict.items() if key in ["string1", "string2"]}
if new_dict:
final_list.append(Row(**new_dict))
return final_list
df_rdd = df.rdd.flatMap(lambda row: parseCell(row))
new_df = spark.createDataFrame(df_rdd)
new_df.show()
输出:
+----------------------------------------------------------------------------+
|ADSZ_2 |
+----------------------------------------------------------------------------+
|[{string1 -> a, string2 -> b}, {string1 -> c, string2 -> d}] |
|[{string1 -> e, string2 -> f}, {not_taken -> 1, string1 -> g, string2 -> h}]|
|[{string1 -> i, string2 -> j}, {not_taken -> 1, string1 -> k, string2 -> l}]|
|null |
|[null, {not_taken -> 1, string1 -> m, string2 -> n}] |
+----------------------------------------------------------------------------+
+-------+-------+
|string1|string2|
+-------+-------+
| a| b|
| c| d|
| e| f|
| g| h|
| i| j|
| k| l|
| m| n|
+-------+-------+
您需要确保生成的所有行都parseCell
包含正确的列数。
推荐阅读
- r - 分离行并获得完整的数据集
- python - 无法访问 netCDF4 文件,不确定原因
- excel - 如何将列中的所有引用从绝对 excel 更改为相对 excel?
- javascript - 将加载状态应用于数组中单击的项目 - React
- python - 诗歌安装pandas mac os失败
- amazon-web-services - Sagemaker 笔记本与 Sagemaker 容器
- memory - 使用 Verilog 验证单端口 RAM
- python - Even when giving proper shape for slicing I am getting a weird shape
- java - Niimbot printer and Android Studio
- javascript - Google Books Thumbail not working in Image React Native