java - 跟踪 Spark 结构化流中的消费消息
问题描述
我想设置配置,让我的应用程序跟踪来自 kafka 的消费消息。因此,每当它失败时,它可以从最后一次提交或消耗的偏移量开始选择。
readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start();
我在网上读到checkpointlocation
可以设置属性,火花可以使用它来跟踪偏移量。
想知道我可以在哪里设置这个属性?我可以在上面的代码中设置option
吗?我可以知道如何正确设置它。
其次,我无法理解trigger(Trigger.Continuous("1 second"))
财产。Docs 说continuous processing engine will record the progress of the query every second
,它在阅读来自 kafka 的消息时记录了什么样的进度?
解决方案
您可以将检查点位置设置为您的选项writeStream
:
[...]
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.option("checkpointLocation", "/path/to/dir")
.trigger(Trigger.Continuous("1 second"))
.start();
从 Kafka 读取时跟踪进度意味着跟踪 TopicPartition 中消耗的偏移量。设置检查点位置将使您的应用程序能够将该信息作为 JSON 对象存储在给定路径中,例如
{
"topic1":{
"0":11,
"1":101
}
}
这意味着应用程序已经消耗了 partition 中的 offset 10和topic0
的 partition 中的 offset 100 。检查点是“提前”写入的(使用 write-ahead-logs),因此应用程序将继续从 Kafka 读取消息,该消息在预期或意外(失败)重新启动之前中断。1
topic1
从Trigger.Continuous
Spark 版本 2.3 开始可用。并且现在标记为实验性的。与微批处理方法相比,它会在 Kafka 中的每条消息到达主题后立即获取它,而无需尝试将其与其他消息进行批处理。这可以改善延迟,但很可能会降低您的整体吞吐量。
参数(例如1 seconds
)确定检查点的频率。
使用此触发模式时,重要的是至少要有与主题分区一样多的可用内核。否则,申请将不会有任何进展。你可以在这里阅读更多关于它的信息:
“例如,如果您正在读取具有 10 个分区的 Kafka 主题,那么集群必须至少有 10 个内核才能使查询取得进展。”
推荐阅读
- javascript - 如何通过 chrome 扩展将模块脚本注入客户端的浏览器选项卡?
- laravel - 使用 redis + Horizon 时 laravel 限速作业如何工作
- python - Web 服务如何显示数组类型对象的内容
- python - 有没有更好的方法来枚举列表中的列表?
- mysql - 错误:解析 url 时出错:heroku 中未定义(Nodejs 和 sequelize)
- python - 如何生成具有特定要求的人口的随机子样本?
- excel - 使用 VBA 从不同工作表中的单元格值动态填充 Excel 表格
- linux - CPU 使用率差异
- c# - 如何创建一个开关来为 Windows 窗体选择深色主题?那可以使整个背景变暗吗?在c#中
- database - 在决定要在应用程序中跟踪哪些数据后,如何编写参数列表?