首页 > 解决方案 > 为什么在 flink EXACTLY_ONCE 模式下每次提交都关闭了 kafka 生产者

问题描述

我在我的 flink 应用程序中使用 flink-connector-kafka,语义设置为 EXACTLY_ONCE,我看到日志不断打印 kafka 已关闭并重新连接,如下所示:

Closing the Kafka producer with timeoutMillis = 0 ms.
Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.

我查看源代码,发现来自生产者提交函数的关闭调用,提交乐趣在finally块中调用recycleTransactionalProducer,而recycleTransactionalProducer乐趣调用关闭乐趣,打印日志,那么为什么每个kafka生产者都关闭了犯罪?

包中的源代码:

org.apache.flink.streaming.connectors.kafka;

org.apache.kafka.clients.producer;

    @Override
    protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                transaction.producer.commitTransaction();
            } finally {
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }

    private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
        availableTransactionalIds.add(producer.getTransactionalId());
        producer.flush();
        producer.close(Duration.ofSeconds(0));
    }

    private void close(Duration timeout, boolean swallowException) {
        long timeoutMs = timeout.toMillis();
        if (timeoutMs < 0)
            throw new IllegalArgumentException("The timeout cannot be negative.");
        log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);

标签: apache-kafkaapache-flink

解决方案


引用http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-FlinkKafkaProducer-closing-after-timeoutMillis-9223372036854775807-ms-td39488.html

...当为 FlinkKafkaProducer 使用完全一次语义时,会为每个并发检查点创建一个固定大小的短期 Kafka 生产者池。当一个检查点开始时,FlinkKafkaProducer 会为该检查点创建一个新的生产者。一旦所述检查点完成,该检查点的生产者就会尝试关闭和回收。因此,如果您使用的是一次性事务 FlinkKafkaProducer,那么看到 Kafka 生产者的日志被关闭是很正常的。


推荐阅读