apache-spark - 使用 Spark 和 Elastic Search 时在 Kafka 中存储偏移的正确方法
问题描述
我对此做了很多研究,但我仍然无法找到合适的东西。我到处走,我看到最简单的方法是调用saveToEs()
然后提交偏移量。我的问题是如果saveToEs()
由于某种原因失败怎么办?
当我们使用 Spark 流作业并将文档存储在 ES 中时,在 Kafka 中存储偏移量的正确方法是什么。我尝试BulkProcessorListener
手动使用和存储偏移量(跟踪排序的偏移量和请求等等),但它失控了,对于这样的一般任务,这种方法似乎很复杂。
有人可以指导我吗?
任何对我的方法感兴趣的人,这里是解释它的问题 Commit Offsets to Kafka on Spark Executors
解决方案
推荐阅读
- android - 使用 jetpack compose 的麻烦
- angular - 在角度 8 中使用 NeoVis 绘制 Neo4j 图形
- python - Python send_message() 缺少 1 个必需的位置参数:“文本”
- rust - 通过计算自身当前地址的固定偏移量来进行自我引用
- javascript - 使用 javascript 从下拉菜单中选择键,然后选择值(tom 选择)
- java - 在 Elasticsearch 中使用多个分析器创建和合并索引
- php - CakePHP 4 - 保存关联数据不适用于 3 个简单的表
- javascript - Safari 浏览器上第三方 cookie 的解决方法
- python - plt.set_title() 中的乱码标题字符串
- python - 如何使用 Scapy 发送 TLS ClientHello?