apache-spark - Spark-Streaming 最早因 kafka 起始偏移而挂起(Kafka 2,spark 2.4.3)
问题描述
我遇到了 Spark-Streaming 和 Kafka 的问题。在运行示例程序以从 Kafka 主题消费并将微批处理结果输出到终端时,当我设置选项时,我的工作似乎挂起:
df.option("startingOffsets", "earliest")
从最新的偏移量开始工作可以正常工作,结果会在每个微批次流过时打印到终端。
我在想这可能是一个资源问题——我正在尝试从一个包含大量数据的主题中读取。但是我似乎没有内存/cpu 问题(使用本地 [*] 集群运行此作业)。这项工作似乎从未真正开始,而只是挂在网上:
19/09/17 15:21:37 INFO Metadata: Cluster ID: JFXVL24JQ3K4CEbE-VA58A
val sc = new SparkConf().setMaster("local[*]").setAppName("spark-test")
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("spark-test")
.getOrCreate()
val topic = "topic.with.alotta.data"
//subscribe tokafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
//write
df.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
我希望看到打印到控制台的结果....但是,应用程序似乎就像我提到的那样挂起。有什么想法吗?感觉就像一个火花资源问题(因为我正在针对一个有大量数据的主题运行一个本地“集群”。我是否缺少流数据帧的性质?
解决方案
写入控制台会导致每次触发时将所有数据收集到驱动程序的内存中。由于您当前没有限制批次的大小,这意味着整个主题内容正在累积在驱动程序中。请参阅https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#output-sinks
设置批量大小的限制应该可以解决您的问题。maxOffsetsPerTrigger
从 Kafka 读取时尝试添加设置...
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.load()
有关详细信息,请参阅https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html。
推荐阅读
- java - Java 项目的输出在 Windows 命令提示符下无法正确打印 ä 或 ö
- firebase - Flutter 检查用户名是否已存在于 Firestore 数据库中
- c# - Html 敏捷包寻址
- javascript - 未捕获的 ReferenceError:未定义参数名称
- stenciljs - 不可变的 StencilJS 道具在更改时重新渲染
- python - 如何使用 python-docx 更改标题的样式?
- php - 多对多数组集合形式 Symfony
- python - 如何填充 PIL 绘制的自相交多边形?
- laravel - Laravel - 无法通过“何时”获得收集结果
- php - 如何在 PHP/HTML 中有多个搜索输入/值