首页 > 解决方案 > Spark结构化流式writeStream到控制台问题

问题描述

我正在使用 jupyter notebook 并在 windows 上编写一个简单的 spark 结构化流应用程序。

这是我的代码:

import sys
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark=SparkSession.builder.appName("StructuredNetworkCount").getOrCreate()

lines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
words=lines.select(explode(split(lines.value," ")).alias("word"))
wordCounts=words.groupBy("word").count()

query=wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

我收到以下我无法解决的错误

Py4JJavaError: An error occurred while calling o183.start.
: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\fhaou\AppData\Local\Temp\temporary-f4c98508-2cbe-4009-bdae-cf93d5b722be\.metadata.0666cc4c-27d8-4ccc-bf10-6ec00474859b.tmp
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
    at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:99)
    at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:352)
    at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:399)
    at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:584)
    at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
    at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
    at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
    at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
    at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:311)
    at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
    at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
    at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:318)
    at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:125)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:123)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:123)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:48)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:281)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

您能否提一些建议。请注意,我运行其他 Spark 流媒体应用程序完全正常,但是当我决定转向结构化流媒体时,我遇到了这个问题。

标签: windowspysparkjupyter-notebookspark-structured-streaming

解决方案


推荐阅读