scala - 如何使用 spark 将来自 kafka 主题的流数据写入 hdfs?
问题描述
我一直试图让这段代码工作几个小时:
val spark = SparkSession.builder()
.appName("Consumer")
.getOrCreate()
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", url)
.option("subscribe", topic)
.load()
.select("value")
.writeStream
.format(fileFormat)
.option("path", filePath)
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
它给出了这个例外:
Logical Plan:
Project [value#8]
+- StreamingExecutionRelation KafkaV2[Subscribe[MyTopic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
我不明白发生了什么,我只是尝试使用火花流将来自 kafka 主题的数据写入 HDFS。为什么这么难?我该怎么做?
我让批处理版本工作得很好:
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", url)
.option("subscribe", topic)
.load()
.selectExpr("CAST(value AS String)")
.write
.format(fileFormat)
.save(filePath)
解决方案
@happy您在结构化流中遇到了一个已知错误https://issues.apache.org/jira/browse/SPARK-25257
这是因为磁盘的偏移量永远不会反序列化,并且该修复将在即将发布的版本中合并
推荐阅读
- javascript - 在 Openlayers 中动态更改图层的可见性
- sql - 如何将多个逗号分隔的字符串转换为具有多列的表
- java - 当尝试使用 primary_key 作为 Hibernate 中复合键的一部分时,字段 'index_id' 没有默认值
- python - 在 Python 3 中使用 Zip 每 2 个字符拆分字符串
- compilation - 动态链接和静态链接的指令点值
- javascript - JSON - 将子子项作为 IF 语句的条件
- r - R找到ecdf函数的倒数
- scikit-learn - 为什么我的 train_test_split 不适用于分层选项?
- python - 如何用函数评估方程?
- python - 如何在 Paramiko 服务器中实现端口转发?