apache-kafka - 为什么在 flink EXACTLY_ONCE 模式下每次提交都关闭了 kafka 生产者
问题描述
我在我的 flink 应用程序中使用 flink-connector-kafka,语义设置为 EXACTLY_ONCE,我看到日志不断打印 kafka 已关闭并重新连接,如下所示:
Closing the Kafka producer with timeoutMillis = 0 ms.
Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
我查看源代码,发现来自生产者提交函数的关闭调用,提交乐趣在finally块中调用recycleTransactionalProducer,而recycleTransactionalProducer乐趣调用关闭乐趣,打印日志,那么为什么每个kafka生产者都关闭了犯罪?
包中的源代码:
org.apache.flink.streaming.connectors.kafka;
org.apache.kafka.clients.producer;
@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
availableTransactionalIds.add(producer.getTransactionalId());
producer.flush();
producer.close(Duration.ofSeconds(0));
}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
解决方案
...当为 FlinkKafkaProducer 使用完全一次语义时,会为每个并发检查点创建一个固定大小的短期 Kafka 生产者池。当一个检查点开始时,FlinkKafkaProducer 会为该检查点创建一个新的生产者。一旦所述检查点完成,该检查点的生产者就会尝试关闭和回收。因此,如果您使用的是一次性事务 FlinkKafkaProducer,那么看到 Kafka 生产者的日志被关闭是很正常的。
推荐阅读
- java - 我试着用谷歌搜索这个。双链表问题。已编辑(2)
- r - 返回具有最少矩阵数的列表
- django - 在使用 ModelSerializer 创建之前编辑字段
- facebook - 如何从 Facebook 评论中读取标记的用户 - Facebook API
- java - 文件未复制到路径中
- javascript - 使用 addEventListener 将数字传递给外部 js 文件
- css - 将 ::-webkit-scrollbar 应用于除一个之外的所有元素
- php - 加载 gRPC 二进制模块失败(gRPC php 教程)
- python - 如何将序列提供给 TensorFlow Keras 模型?
- excel - 如何根据列中的条件将行从一张表复制到另一张表 - 仅粘贴值和格式(而不是公式)?