spring-boot - Spring Cloud Stream:Headers kafka_acknowledgement 值为 null
问题描述
我试图根据应用程序中消息的处理状态来控制 kafka 主题的偏移量提交。如果消息成功,则可以提交偏移量。为此,我试图在我的方法中获取标题,如果可以手动确认该消息
spring:
cloud:
stream:
default:
contentType: application/json
default-binder: binder1-kafka
bindings:
myChannel:
binder: binder1-kafka
destination: my_topic
content-type: text/plain
consumer:
autoCommitOffset: false
outChannel:
binder: binder2-kafka
destination: my_topic
content-type: text/plain
consumer:
autoCommitOffset: false
binders:
#Connection config to different clusters
binder1-kafka:
type: kafka
defaultCandidate: true
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: some-url1:9092
binder2-kafka:
type: kafka
defaultCandidate: false
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: some-url2:9092
但是当使用监听器作为
@StreamListener(target = IBrokerChannel.myChannel )
public void handlePayload(@Payload MyPayload payload, @Headers Map<String, Object> headers) {
Acknowledgment acknowledgment= (Acknowledgment) headers.get("kafka_acknowledgment"); // acknowledgment object is always null.
acknowledgment.acknowledge();
}
acknowledgment
总是null
。我正在使用 kafka 生产者 cli 将消息发送到主题。spring-boot
版本是1.5.10.RELEASE
解决方案
您的配置属性定义中缺少kafka
分支。它必须是这样的:
spring:
cloud:
stream:
default-binder: kafka
kafka:
bindings:
myChannel:
consumer:
autoCommitOffset: false
推荐阅读
- r - 使用summary.table时如何修复'object'必须从类“table”继承?
- php - 子域路由覆盖并运行 web.php 中的最后一个子域
- excel - 仅当行包含一个值时,将特定行复制到另一个工作表
- javascript - 如何使用 css 模糊图像标签的一部分,我没有使用背景图像的选项
- c# - Entity Framework Core 从 SQL 的一对多映射
- java - 将 Java 地图转换为 SearchSourceBuilder Elasticsearch 7.2 Java 高级 API
- flowtype - 使用可选 id 操作 Flow 泛型类型列表
- c# - 修改 SelectList 中项目中的文本
- java - 如何逆时针旋转一圈画布android
- javascript - 如何将猫鼬承诺循环限制为几次?