apache-flink - Apache Flink 1.3.2 与 Kafka 1.1.0 的连接问题
问题描述
我正在使用 Apache Flink 1.3.2 集群。我们正在使用 Kafka 消息,并且自从将代理升级到 1.1.0(从 0.10.2)以来,我们经常在日志中注意到这个错误:
ERROR o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException: null
因此,有时我们会在处理过程中遇到丢失事件。我们在工作中使用 FlinkKafkaConsumer010。
检查点已启用(间隔 10 秒,超时 1 分钟,检查点之间的最小暂停 5 秒,最大并发检查点 1。E2E 持续时间平均低于 1 秒,甚至我会说不到半秒。)Kafka 0.10 使用了相同的设置。 2 我们没有这个例外。
更新:我们已经重新安装了 Kafka,现在我们收到一条警告消息,但仍然没有读取任何事件
WARN o.a.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.
解决方案
原来这是由我们在 AWS 中遇到的一些连接问题引起的。该框架适用于 Kafka 1.1
推荐阅读
- javascript - Javascript向选择框添加一个值
- python-xarray - 如何在 xarray 的滚动窗口上应用自定义函数?
- c# - 如何将某些东西移动到“另一个程序集”?
- spacy - 尝试使用 spacy 的英文包时遇到错误
- mongodb - 如何在一个文档 MongoDB 中找到具有相似字段名称的子字段
- laravel-5 - Laravel 调度程序无法在 Dreamhost VPS 中运行
- json - 如何在flutter中更新本地json字段
- javascript - 如何在 vuejs 中使用循环包装 2 个或更多 html 元素
- ansible - 如何获取结果并通过 ansible 中的字典传递
- python - 拆分后获取子字符串的值