scala - Spark Streaming,从 Socket 读取:java.lang.ClassCastException:java.lang.String 无法转换为 org.apache.spark.unsafe.types.UTF8String
问题描述
我在 Windows 10 上尝试'\n'
使用 Spark Streaming(Spark 2.4.4)从 TCPsocket 源(到目前为止的测试目的)中读取多个文本行。应统计字数,并在控制台上定期显示当前字数。这是 Spark 流的标准测试,在几本书和网络帖子中找到,但似乎在套接字源上失败:
文本字符串从 Java 程序发送,例如:
serverOutSock = new ServerSocket(9999);
// Establish connection; wait for Spark to connect
sockOut = serverOutSock.accept();
// Set UTF-8 as format
sockOutput = new OutputStreamWriter(sockOut.getOutputStream(),"UTF-8");
// Multiple Java Strings are now written (thousands of them) like
sockOutput.write(string+'\n');
在 Spark 接收端,Scala 代码如下所示:
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1)
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.trigger(Trigger.Continuous("1 second"))
.outputMode("complete")
.format("console")
.start
.awaitTermination
所以,我想在当前字数的控制台上每秒写一次。
但我收到一个错误:
java.lang.ClassCastException:java.lang.String 不能转换为 org.apache.spark.unsafe.types.UTF8String
并且 Spark 似乎没有从源处理任何内容(由于源输入的强制转换异常?)。至少控制台上没有写出任何内容。这可能是什么原因?
完整的堆栈跟踪如下:
Exception in thread "null-4" java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.streaming.continuous.shuffle.RPCContinuousShuffleWriter.write(RPCContinuousShuffleWriter.scala:51)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousCoalesceRDD$$anonfun$4$$anon$1.run(ContinuousCoalesceRDD.scala:112)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我试图移除coalesque(1)
并用Continuous
触发器替换ProcessingTime
触发器。这使得错误不会发生,但控制台打印输出变为:
批次:0
+-----+-----+ |值|计数| +-----+-----+ +-----+------+
也就是说,没有输出,即使确实有很多单词被注入到套接字中。此外,此输出仅显示一次,并且比 1 秒后显示的要晚得多。
解决方案
推荐阅读
- javascript - 在 JS 对象中。为什么原型和构造函数总是相互嵌套
- c# - 动态保存文件 .net core
- sed - 得到一切直到美元符号,但如果没有美元符号存在
- python - Beautiful Soup 将 < 替换为 <
- python - 如何将while函数附加到列表中?
- oauth-2.0 - 如何在 Keycloak 中指定刷新令牌的生命周期
- oracle - ORA-02291: 插入多行时找不到父键
- javascript - 如何缩短此 JavaScript 代码?
- vue.js - 在 vuejs 项目中包含 bootstrap 4 的正确方法
- javascript - php:修改属性html标签