python - Pyspark 将数据帧写入 csv 文件会出错
问题描述
我以本地模式启动了 pyspark,目的是学习。一切都进展顺利,直到我尝试使用此代码将数据帧写入 csv 文件并将其保存到 csv 文件中:即使我已经使环境无法正常工作,请帮助我克服这个问题
days = df1.select('days').where("days BETWEEN '2020-01-01' AND '2020-01-31'").distinct().collect()
for day in days[:]:
_df = df1.where(f"days = {day[0]}")
_df.coalesce(1).write.mode("append").option("header", "true").csv("C:/Users/zeyneb ben massoud/Documents/stage")
它给了我这个错误,同时用 csv 文件的名称创建了一个空文件夹,而没有写入文件本身,我一直在四处寻找解决方案,但没有任何线索:
>#error#:
Py4JJavaError Traceback (most recent call last)
<ipython-input-28-c02692e03ad7> in <module>
2 _df = df1.where(f"days = {day[0]}")
3
----> 4 _df.coalesce(1).write.mode("append").option("header", "true").csv("C:/Users/zeyneb ben massoud/Documents/stage")
5 #_df.coalesce(1).write.save("", "csv", "append")
>~\anaconda3\lib\site-packages\pyspark\sql\readwriter.py in csv(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, lineSep)
1370 charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
1371 encoding=encoding, emptyValue=emptyValue, lineSep=lineSep)
-> 1372 self._jwrite.csv(path)
1373
1374 def orc(self, path, mode=None, partitionBy=None, compression=None):
>~\anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
>~\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
>~\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
>Py4JJavaError: An error occurred while calling o164.csv.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$1654/502507831.apply(Unknown Source)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$1685/79577035.apply(Unknown Source)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter$$Lambda$3185/115329320.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1459/1398665899.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1452/1091979718.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:979)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
... 38 more
解决方案
推荐阅读
- javascript - 浏览器缓存 API 不适用于 cookie 身份验证服务
- r - 我们如何构建一个同时满足多个条件的循环/ if 语句?
- php - 如何将较低的 PHP 版本代码(codeigniter)运行到较高的 php 版本
- c# - 我将如何设置一个 StatusStrip,其中有 2 个 StatusLabels 在彼此下方
- java - 如何在 Junit5 中获取成功和不成功的断言列表
- sql-server - 如何在 if 语句之前创建列表?
- html - 使用 vue.js 进行条件渲染
- python - Python 的整数和浮点数问题
- python - 如何使用python杀死以字符串开头的任务
- r - 将两个不同大小的 tbls 按一个因素连接起来,并将某个值的位置放在另一个中