首页 > 解决方案 > Pyspark 将数据帧写入 csv 文件会出错

问题描述

我以本地模式启动了 pyspark,目的是学习。一切都进展顺利,直到我尝试使用此代码将数据帧写入 csv 文件并将其保存到 csv 文件中:即使我已经使环境无法正常工作,请帮助我克服这个问题


days = df1.select('days').where("days BETWEEN '2020-01-01' AND '2020-01-31'").distinct().collect()
for day in days[:]:
        _df = df1.where(f"days = {day[0]}")
        _df.coalesce(1).write.mode("append").option("header", "true").csv("C:/Users/zeyneb ben massoud/Documents/stage")

它给了我这个错误,同时用 csv 文件的名称创建了一个空文件夹,而没有写入文件本身,我一直在四处寻找解决方案,但没有任何线索:

>#error#: 
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-28-c02692e03ad7> in <module>
      2     _df = df1.where(f"days = {day[0]}")
      3 
----> 4     _df.coalesce(1).write.mode("append").option("header", "true").csv("C:/Users/zeyneb ben massoud/Documents/stage")
      5     #_df.coalesce(1).write.save("", "csv", "append")

>~\anaconda3\lib\site-packages\pyspark\sql\readwriter.py in csv(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, lineSep)
   1370                        charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
   1371                        encoding=encoding, emptyValue=emptyValue, lineSep=lineSep)
-> 1372         self._jwrite.csv(path)
   1373 
   1374     def orc(self, path, mode=None, partitionBy=None, compression=None):

>~\anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

>~\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

>~\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

>Py4JJavaError: An error occurred while calling o164.csv.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$1654/502507831.apply(Unknown Source)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$1685/79577035.apply(Unknown Source)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter$$Lambda$3185/115329320.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1459/1398665899.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1452/1091979718.apply(Unknown Source)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:979)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
    at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
    at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
    at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
    ... 38 more

标签: pythonapache-sparkpysparksave

解决方案


推荐阅读