apache-spark - 引发流无法写入 hdfs 路径
问题描述
我使用 java 1.8 和 kafka 0.10.x 一起使用 spark-sql-2.4.1v。
Dataset<Row> dataSet= sparkSession
.readStream()
.format("kafka")
.option("subscribe", INFO_TOPIC)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
.option("maxOffsetsPerTrigger", 1000)
.option("auto.offset.reset", "latest")
.option("failOnDataLoss", false)
.load();
StreamingQuery query = dataSet.writeStream()
.format(PARQUET_FORMAT)
.option("path", parqetFileName)
.option("checkpointLocation", checkPtLocation)
.trigger(Trigger.ProcessingTime("15 seconds"))
.start();
query.awaitTermination();
将数据写入我的 hdfs 路径(即 parqetFileName)后,它失败并出现以下错误。
[DataStreamer for file /user/parquet/raw/part-00001-7cba7fa3-a98f-442d-9584-b71085b7cd82-c000.snappy.parquet] WARN org.apache.hadoop.hdfs.DataStreamer - Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
at java.lang.Thread.join(Thread.java:1323)
at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:980)
at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:630)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:807)
这里有什么问题以及如何解决?
解决方案
您的代码中必须包含streamContext.awaitTermination()
- 否则应用程序将在启动流后立即退出。
推荐阅读
- reactjs - 在 react-router 4 中单击导航链接后如何加载页面
- thorntail - 如何使用 project-default.yml 配置 Thorntail http 端口?
- javascript - 返回 false 但提交
- android - Android NDK - 从 dlsym() 仅加载主 DLL\SO 的符号
- javascript - 将函数变量传递给第二个函数会更改变量值
- python - 在 keras 中使用 ANN 制作了奇数分类器,没有得到好的结果?
- http2 - 序列化的头块如何划分?
- plantuml - 矩形后回车
- ruby-on-rails - Rails 发电机没有为引擎加载
- laravel - laravel php artisan serve 命令有效,但 localhost url 在 ubuntu 18.10 中不起作用