mule - Mule Anypoint studio Kafka 连接器 - 消费者无法接收消息
问题描述
Mule 应用程序(Any Point Studio 应用程序)正在消费 API 应用程序通过 Kafka 发布的消息,
但是 Mule(Any Point Studio) Kafka Consumer 无法接收消息。每次我停止启动应用程序以使用消息时,即使我们这样做,我们也会收到旧消息,包括新消息,
使用 Mule 3.9 版本,kafka-client 0.10.0.0
我尝试将一些属性添加到 consumer.properties 文件中,例如 poll。
Consumer.Properties having below code,
group.id=user
auto.offset.reset=earliest
enable.auto.commit=false
Consumer.Properties having below code,
group.id=user
在 Any point studio flow 中,Kafka Connector [Consumer] -> 给定主题名称为 customer_data,分区为 1 。没有提供任何偏移量
我希望消费者应该在不重新启动应用程序的情况下阅读消息,并且不应该再次收到旧消息
解决方案
问题是,如果您的 enable.auto.commit 为 true,mule 只会管理偏移量。保持真实是个坏主意,因为即使您的 muleflow 失败,它也会提交偏移量。所以你通过禁用它做了正确的事情。
问题来了——在 enable.auto.commit=false 之后,您基本上应该自己管理偏移量。我遇到了类似的问题,并考虑创建自定义类,该类将在成功执行 mule 流后显式调用 sync()。
推荐阅读
- python - 将 pandas DataFrame 转换为嵌套的 JSON 数组
- mysql - MySQL 5.7 AUTO_INCREMENT 生成重复值
- typescript - 我应该对这两个打字稿功能进行单元测试吗?如果是这样,为什么(或如何)。我正在使用 JEST
- boost - Boost.Container `dlmalloc` 和 `jemalloc`
- r - 检测向量中值的变化
- reactjs - combineReducers 以获得更好的 redux 存储命名
- python - 在 Python 环境中安装 Scrapy 包
- amazon-web-services - 如何在 Terraform 配置和 Packer 模板之间共享配置?
- javascript - 如何修复 VueJs 应用程序中的“SyntaxError: let is a reserved identifier”
- mysql - MYSQL 插入多行的 SQL 语句