首页 > 解决方案 > Spark Streaming,从 Socket 读取:java.lang.ClassCastException:java.lang.String 无法转换为 org.apache.spark.unsafe.types.UTF8String

问题描述

我在 Windows 10 上尝试'\n'使用 Spark Streaming(Spark 2.4.4)从 TCPsocket 源(到目前为止的测试目的)中读取多个文本行。应统计字数,并在控制台上定期显示当前字数。这是 Spark 流的标准测试,在几本书和网络帖子中找到,但似乎在套接字源上失败:

文本字符串从 Java 程序发送,例如:

serverOutSock = new ServerSocket(9999);
// Establish connection; wait for Spark to connect
sockOut = serverOutSock.accept();
// Set UTF-8 as format
sockOutput = new OutputStreamWriter(sockOut.getOutputStream(),"UTF-8");
// Multiple Java Strings are now written (thousands of them) like
sockOutput.write(string+'\n');

在 Spark 接收端,Scala 代码如下所示:

val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1)
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
      .trigger(Trigger.Continuous("1 second"))
      .outputMode("complete")
      .format("console")
      .start
      .awaitTermination

所以,我想在当前字数的控制台上每秒写一次。

但我收到一个错误:

java.lang.ClassCastException:java.lang.String 不能转换为 org.apache.spark.unsafe.types.UTF8String

并且 Spark 似乎没有从源处理任何内容(由于源输入的强制转换异常?)。至少控制台上没有写出任何内容。这可能是什么原因?

完整的堆栈跟踪如下:

Exception in thread "null-4" java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at org.apache.spark.sql.execution.streaming.continuous.shuffle.RPCContinuousShuffleWriter.write(RPCContinuousShuffleWriter.scala:51)
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousCoalesceRDD$$anonfun$4$$anon$1.run(ContinuousCoalesceRDD.scala:112)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我试图移除coalesque(1)并用Continuous触发器替换ProcessingTime触发器。这使得错误不会发生,但控制台打印输出变为:


批次:0

+-----+-----+ |值|计数| +-----+-----+ +-----+------+

也就是说,没有输出,即使确实有很多单词被注入到套接字中。此外,此输出仅显示一次,并且比 1 秒后显示的要晚得多。

标签: scalasocketsapache-sparkspark-streaming

解决方案


推荐阅读