首页 > 解决方案 > 消息的手动确认(检查点):Spring Cloud Stream Kenesis Binder

问题描述

我们正在尝试将使用来自 Kafka 的消息的 Spring Cloud Stream 应用程序移植到 AWS Kenesis。我们需要手动确认来处理某些超时条件。

对于 Kafka,我们使用属性autocommitoffset为 false 并使用 ACKNOWLEDGMENT 标头来处理手动确认。

我浏览了 Spring Cloud Stream 的文档,浏览了以下内容: https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/ https://github.com/spring-cloud/spring- cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc

但找不到任何解决方案。任何指针都会非常有帮助。

标签: spring-cloud-streamspring-cloud-dataflow

解决方案


经过一番搜索,找到了解决方案:

在 Kenesis 中, shard等价于partitioncheckpointoffset

在应用 Yml 中:

spring:
  cloud:
    stream:
      kinesis:
        bindings:
          consumer-in-0:
            consumer:
              checkpointMode: manual

检查点的示例代码

 @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            System.out.println("message received : "+ message.getPayload());
            System.out.println("message headers : "+ message.getHeaders());
            Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
            checkPointer.checkpoint();
           
        };
    }

引用的Kenesis Consumer Binder 属性


推荐阅读