首页 > 解决方案 > PySpark: py4j.protocol.Py4JJavaError: 调用 o215.save 时出错

问题描述

我正在尝试在 Pyspark 中为 Kmeans 模型创建和加载 pickle 文件。我正在使用 Python 3.7.9 和 PySpark 3.0.1 版。我能够创建一个泡菜文件,但出现以下错误:

代码:

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
import joblib
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Session1').getOrCreate()

mindset_agg_df = spark.read.csv(config.mindset_level2_output, header=True, inferSchema=True)


def customer_seg(data, k_value):
feature_cols = data.columns
feature_cols.remove('Cst_ID')
data = data.na.fill(0).drop('Cst_ID')
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_data = assembler.transform(data)

scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
transformed_data = scaler.fit(assembled_data).transform(assembled_data)

kmeans = KMeans(featuresCol='scaled_features', k=k_value)
model = kmeans.fit(transformed_data)
data4 = model.transform(transformed_data)

data5 = data4.withColumn("cluster_name", data4.prediction + 1)
data6 = data5.select('cluster_name')
data6 = data6.withColumn("id_col", F.monotonically_increasing_id())
data = data.withColumn("id_col", F.monotonically_increasing_id())
data_final = data.join(data6, on="id_col", how="inner")

# You can drop the id_col column after this
data_final = data_final.drop("id_col")

# Saving the Model as Pickle file
# joblib.dump(data4, config.file_path_customer + 'kmeans_model.pkl')
model_path = config.file_path_customer + 'model_kmeans'
model.save(model_path)

return data_final
# Calling the function
df_cluster = customer_seg(mindset_agg_df, 4)
print(df_cluster.show())

以上是我在 Pyspark 3.0.1 版中运行的代码。

错误:

Traceback (most recent call last):



File "C:/Users/1624004/PycharmProjects/predictcx_15122020/src/core/customer/customer_seg.py", line 87, in <module>
    df_cluster = customer_seg(mindset_agg_df, 4)
  File "C:/Users/1624004/PycharmProjects/predictcx_15122020/src/core/customer/customer_seg.py", line 74, in customer_seg
    model.save(model_path)
  File "C:\Users\1624004\PycharmProjects\predictcx\venv\lib\site-packages\pyspark\ml\util.py", line 224, in save
    self.write().save(path)
  File "C:\Users\1624004\PycharmProjects\predictcx\venv\lib\site-packages\pyspark\ml\util.py", line 175, in save
    self._jwrite.save(path)
  File "C:\Users\1624004\PycharmProjects\predictcx\venv\lib\site-packages\py4j\java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\1624004\PycharmProjects\predictcx\venv\lib\site-packages\pyspark\sql\utils.py", line 128, in deco
    return f(*a, **kw)
  File "C:\Users\1624004\PycharmProjects\predictcx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o215.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
    at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1552)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1552)
    at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1538)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1538)
    at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
    at org.apache.spark.ml.clustering.InternalKMeansModelWriter.write(KMeans.scala:197)
    at org.apache.spark.ml.util.GeneralMLWriter.saveImpl(ReadWrite.scala:260)
    at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 245, 01HW1729894, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\1624004\PycharmProjects\predictcx_15122020\src\filestore\output\segmentation\model_kmeans\metadata\_temporary\0\_temporary\attempt_20210113101034_0096_m_000000_0\part-00000
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:230)
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:120)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2152)
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
    ... 51 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\1624004\PycharmProjects\predictcx_15122020\src\filestore\output\segmentation\model_kmeans\metadata\_temporary\0\_temporary\attempt_20210113101034_0096_m_000000_0\part-00000
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:230)
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:120)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more


Process finished with exit code 1

以下是我在运行上述代码时遇到的完整错误。

标签: pythonmachine-learningpysparkpicklek-means

解决方案


您不能腌制数据框。我认为您正在尝试保存模型,而不是 dataframe data4。如果是这样,您可以在模型上调用该save方法:

# ... previous code in the function ...

kmeans = KMeans(featuresCol='scaled_features', k=k_value)
model = kmeans.fit(transformed_data)
model.save(config.file_path_customer + 'kmeans_model.pkl')

推荐阅读