amazon-s3 - Kafka S3 连接器一旦交付保证如何工作
问题描述
我已经阅读了他们的博客并理解了他们的例子。 https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
但我正试图围绕我所拥有的这种情况。我目前的配置是:
"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"
根据我读过的关于配置的内容。连接器将在(5 分钟)之后提交50
记录文件或文件,以300000ms
先到者为准。如果连接器将文件上传到 s3 但未能提交到 Kafka,那么由于我设置了轮换计划间隔,Kafka 如何重新上传将覆盖 s3 文件的相同记录?这不会导致 s3 中的重复吗?
解决方案
S3 接收器连接器的文档是另一个很好的资源,它描述了连接器如何保证只向 S3 交付一次,更重要的是,哪些功能组合提供(或不提供)这种保证。
具体来说,该文件中的一个部分说:
为了保证与 的语义一致
TimeBasedPartitioner
,必须将连接器配置为使用确定性实现TimestampExtractor
和确定性轮换策略。确定性时间戳提取器是 Kafka 记录 (timestamp.extractor=Record
) 或记录字段 (timestamp.extractor=RecordField
)。确定性轮换策略配置是rotate.interval.ms
(设置rotate.schedule.interval.ms
是非确定性的,将使一次性保证无效)。
您的 S3 接收器连接器配置确实使用了确定性分区器(通过“partitioner.class”:“io.confluent.connect.storage.partitioner.TimeBasedPartitioner”),但它使用了非确定性 Wallclock 时间戳提取器(通过"timestamp.extractor": "Wallclock"
)。这是不确定的,因为如果连接器确实必须重新启动(例如,由于故障)并重新处理特定记录,它将在稍后重新处理该记录,并且挂钟时间戳提取器将为该记录选择不同的时间。
其次,您的连接器使用该rotate.schedule.interval.ms
选项,文档指出该选项与仅一次交付不兼容。例如,如果连接器确实必须重新处理一系列 Kafka 记录,它可能会将记录分解为与第一次不同的 S3 对象,这意味着 S3 连接器最终会写入不同的 S3 对象。
总之,具有您的配置的 S3 接收器连接器不会提供一次性交付保证。
推荐阅读
- javascript - webRequestBlocking - Firefox 中的 URL glob 模式不匹配
- java - 无法在项目上执行目标无法解析项目的依赖关系无法收集依赖关系
- c - Windows 10 上 C/C++ 错误的 IntelliSense 扩展
- linux - ansible playbook中的'cat'命令不会附加文件
- javascript - 用其他数组中的值更新数组中对象的值
- elasticsearch - 我们如何在 Elastic Search 中的文本类型列上聚合 SUM?
- python - 在全局 boto 资源上使用 moto
- r - 从 List 中提取特定元素
- symfony - 参数太多:查询定义了 0 个参数,你绑定了 1 个错误
- r - 如何将 array1 (2D) 添加到 array2 (3D)?