首页 > 解决方案 > 当主题中没有更多记录时如何在Kafka Consumer中刷新数据批处理

问题描述

考虑这个 Kafka 消费者,它从主题接收数据,将其缓冲到 PreparedStatement 中,当批处理 100K 记录时,它向 DB 发出 INSERT 查询。

直到数据仍然传入,这才有效。但是,例如,当缓冲 20K 记录并且没有更多记录传入时,它仍然等待更多 80K 记录,直到刷新语句。但如果一段时间后停滞不前,我想刷新那些 20K 。我怎样才能做到这一点?我看不出有什么方法可以抓住它。

例如,在使用基于 librdkafka 的 php-rdkafka 扩展的 PHP 中,我RD_KAFKA_RESP_ERR__PARTITION_EOF在达到分区结束时得到,因此在发生这种情况时很容易挂钩缓冲区刷新。

我试图简化代码,所以只剩下重要的部分

public class TestConsumer {

    private final Connection connection;
    private final CountDownLatch shutdownLatch;
    private final KafkaConsumer<String, Message> consumer;
    private int processedCount = 0;

    public TestConsumer(Connection connection) {
        this.connection = connection;
        this.consumer = new KafkaConsumer<>(getConfig(), new StringDeserializer(), new ProtoDeserializer<>(Message.parser()));
        this.shutdownLatch = new CountDownLatch(1);
    }

    public void execute() {
        PreparedStatement statement;
        try {
            statement = getPreparedStatement();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            commit(statement);

            consumer.wakeup();
        }));

        consumer.subscribe(Collections.singletonList("source.topic"));

        try {
            while (true) {
                ConsumerRecords<String, Message> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));

                records.forEach(record -> {
                    Message message = record.value();
                    try {
                        fillBatch(statement, message);
                        statement.addBatch();
                    } catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                });

                processedCount += records.count();

                if (processedCount > 100000) {
                    commit(statement);
                }
            }
        } catch (WakeupException e) {
            // ignore, we're closing
        } finally {
            consumer.close();
            shutdownLatch.countDown();
        }
    }

    private void commit(PreparedStatement statement) {
        try {
            statement.executeBatch();
            consumer.commitSync();
            processedCount = 0;
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }


    protected void fillBatch(PreparedStatement statement, Message message) throws SQLException {
        try {
            statement.setTimestamp(1, new Timestamp(message.getTime() * 1000L));
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

标签: javaapache-kafkakafka-consumer-api

解决方案


我理解你的问题是这样的:

  • 您想使用来自 Kafka 的消息

  • 将它们堆积在内存中多达 10 万条记录

  • 批量提交到数据库

  • 但是您只想等待 t 秒(让我们说 10 秒)

这可以使用 Kafka 内置的消费者批处理以非常有效和可靠的方式实现。如果您可以以某种方式预测消息的平均大小(以字节为单位)。

在 Kafka 消费者配置中,您将设置以下内容:

fetch.min.bytes=> 这应该是 100k x 消息的平均大小

fetch.max.wait.ms=> 这是您的超时时间(以毫秒为单位)(例如 5000 等待 5 秒)

max.partition.fetch.bytes=> 最大。每个分区的数据量。这有助于优化总提取大小

max.poll.records=> 单个轮询中返回的最大记录数..可以设置为 100K

fetch.max.bytes=> 如果要设置单个请求的上限

这样,如果它们符合定义的字节大小,您最多可以获得 100K 记录,但它将等待可配置的毫秒数。

民意调查返回记录后,您可以一次性保存并重复。


推荐阅读