spring-integration - 如何使用spring集成kafka确认消费者读取的kafka消息
问题描述
我们正在使用 spring-integration-kafka 版本 3.1.2.RELEASE 和 int-kafka:message-driven-channel-adapter 来使用来自远程 kafka 主题的消息。生产者发送加密消息,我们正在使用解串器解密实际消息。我们能够消费主题中发布的所有消息。我们使用了自动提交作为假。我们想知道在成功处理消息后如何提交或确认来自我们服务的消息。有人可以帮助我们如何提交从消息驱动通道读取的消息并提供一些参考实现吗?
当我们将自动提交设置为 true 时,我们假设它将在提交间隔之后提交消息,但我们希望在我们的服务中处理它。我遇到了下面的示例,但是我们在反序列化之后收到了一个自定义对象,而不是 spring 集成消息。所以我们想知道如何在转换器中实现类似的确认,这样我们就不会在转换过程中出现任何错误时提交消息。转换成功后提交消息。
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if(acknowledgment != null) { System.out.println("Acknowledgment provided");
acknowledgment.acknowledge(); }
}
<int-kafka:message-driven-channel-adapter
id="kafkaMessageListener"
listener-container="kafkaMessageContainer" auto-startup="true"
phase="100" send-timeout="5000" mode="record"
message-converter="messageConverter"
recovery-callback="recoveryCallback" error-message-strategy="ems"
channel="inputFromKafkaChannel" error-channel="errorChannel" />
<int:transformer id="transformerid"
ref="transformerBean"
input-channel="inputFromKafkaChannel" method="transform"
output-channel="messageTransformer" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${spring.kafka.bootstrap-servers}" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="${spring.kafka.consumer.group-id}" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="com.test.CustomDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="${spring.kafka.topics}" />
</bean>
</constructor-arg>
</bean>
解决方案
auto.commit.offset=true
表示kafka-clients
库提交偏移量。
当为 false 时(对于 Apache Kafka 首选 Spring),侦听器容器在默认情况下接收每个批次后提交偏移量,poll()
但该机制由容器的AckMode
属性控制。
请参阅提交偏移量。
如果您将容器设置AckMode
为MANUAL
或然后您的应用程序必须使用该对象MANUAL_IMMEDIATE
执行提交。Acknowledgment
使用 Spring Integration 时,该Acknowledgment
对象在KafkaHeaders.ACKNOWLEDGMENT
标头中可用。
在大多数情况下,AckMode.BATCH
(默认)或AckMode.RECORD
应该使用,并且您的应用程序不需要担心提交偏移量。
推荐阅读
- ios - 如何将 cookie 字符串从 allHTTPHeaderFields 转换为 HTTPCookie
- azure-ad-b2c - Azure AD B2C 从自定义 UI 模板中去除 html 标记
- vb.net - 如何在新线程中修改文本框值
- oracle - 用于检查值是从 http 还是 https 开始的 Oracle 过程
- vba - 如何检查范围是否包含唯一值?
- java - 批量大小不适用于 Google Guice + JPA + Hibernate
- android-studio - Android Studio:如何将代码包装在括号中
- javascript - 除了 javascript 线程之外,哪些浏览器运行 css 动画?
- netsuite - 保存时 Netsuite 高级 PDF 模板问题
- docker - 如何将多步 docker 更改为一步 docker build?