首页 > 解决方案 > Kafka S3 连接器一旦交付保证如何工作

问题描述

我已经阅读了他们的博客并理解了他们的例子。 https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/

但我正试图围绕我所拥有的这种情况。我目前的配置是:

"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"

根据我读过的关于配置的内容。连接器将在(5 分钟)之后提交50记录文件或文件,以300000ms先到者为准。如果连接器将文件上传到 s3 但未能提交到 Kafka,那么由于我设置了轮换计划间隔,Kafka 如何重新上传将覆盖 s3 文件的相同记录?这不会导致 s3 中的重复吗?

标签: amazon-s3apache-kafkaapache-kafka-connectconfluent-platform

解决方案


S3 接收器连接器的文档是另一个很好的资源,它描述了连接器如何保证只向 S3 交付一次,更重要的是,哪些功能组合提供(或不提供)这种保证。

具体来说,该文件中的一个部分说:

为了保证与 的语义一致TimeBasedPartitioner,必须将连接器配置为使用确定性实现TimestampExtractor和确定性轮换策略。确定性时间戳提取器是 Kafka 记录 ( timestamp.extractor=Record) 或记录字段 ( timestamp.extractor=RecordField)。确定性轮换策略配置是rotate.interval.ms(设置rotate.schedule.interval.ms是非确定性的,将使一次性保证无效)。

您的 S3 接收器连接器配置确实使用了确定性分区器(通过“partitioner.class”:“io.confluent.connect.storage.partitioner.TimeBasedPartitioner”),但它使用了非确定性 Wallclock 时间戳提取器(通过"timestamp.extractor": "Wallclock")。这是不确定的,因为如果连接器确实必须重新启动(例如,由于故障)并重新处理特定记录,它将在稍后重新处理该记录,并且挂钟时间戳提取器将为该记录选择不同的时间。

其次,您的连接器使用该rotate.schedule.interval.ms选项,文档指出该选项与仅一次交付不兼容。例如,如果连接器确实必须重新处理一系列 Kafka 记录,它可能会将记录分解为与第一次不同的 S3 对象,这意味着 S3 连接器最终会写入不同的 S3 对象。

总之,具有您的配置的 S3 接收器连接器不会提供一次性交付保证。


推荐阅读