spring-cloud-stream - 消息的手动确认(检查点):Spring Cloud Stream Kenesis Binder
问题描述
我们正在尝试将使用来自 Kafka 的消息的 Spring Cloud Stream 应用程序移植到 AWS Kenesis。我们需要手动确认来处理某些超时条件。
对于 Kafka,我们使用属性autocommitoffset
为 false 并使用 ACKNOWLEDGMENT 标头来处理手动确认。
我浏览了 Spring Cloud Stream 的文档,浏览了以下内容: https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/ https://github.com/spring-cloud/spring- cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc
但找不到任何解决方案。任何指针都会非常有帮助。
解决方案
经过一番搜索,找到了解决方案:
在 Kenesis 中, shard
等价于partition
和checkpoint
offset
在应用 Yml 中:
spring:
cloud:
stream:
kinesis:
bindings:
consumer-in-0:
consumer:
checkpointMode: manual
检查点的示例代码
@Bean
public Consumer<Message<String>> consume() {
return message -> {
System.out.println("message received : "+ message.getPayload());
System.out.println("message headers : "+ message.getHeaders());
Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
checkPointer.checkpoint();
};
}
引用的Kenesis Consumer Binder 属性
推荐阅读
- sql - 在 Oracle SQL Developer 中搜索包含空格的字符串
- jquery - 引导下拉菜单不起作用(尽管在 bootstrap.js 之前包含 jquery)
- r - 如何比较两种不同的日期格式?
- php - Laravel 数据库查询创建界面
- ios - Clang:错误:链接器命令失败,退出代码 1(使用 -v 查看调用)Xcode 10、Xcode 10.1 Beta、Swift 4.2
- java - 如何序列化将使用 SOAP 发送到 Java 的 C# 类
- reactjs - React 如何在组件之间传递状态
- c# - 如何使用 Umbraco 检查将更新的内容放在搜索结果的顶部?
- java - 如何从结果集中为一个字段选择单行?
- python - 用不在数据集中的值替换 numpy 数组中的所有非唯一值