首页 > 解决方案 > 使用 Spark 和 Elastic Search 时在 Kafka 中存储偏移的正确方法

问题描述

我对此做了很多研究,但我仍然无法找到合适的东西。我到处走,我看到最简单的方法是调用saveToEs()然后提交偏移量。我的问题是如果saveToEs()由于某种原因失败怎么办?

当我们使用 Spark 流作业并将文档存储在 ES 中时,在 Kafka 中存储偏移量的正确方法是什么。我尝试BulkProcessorListener手动使用和存储偏移量(跟踪排序的偏移量和请求等等),但它失控了,对于这样的一般任务,这种方法似乎很复杂。

有人可以指导我吗?

任何对我的方法感兴趣的人,这里是解释它的问题 Commit Offsets to Kafka on Spark Executors

标签: apache-sparkelasticsearchapache-kafkaspark-streamingspark-streaming-kafka

解决方案


推荐阅读