java - 当主题中没有更多记录时如何在Kafka Consumer中刷新数据批处理
问题描述
考虑这个 Kafka 消费者,它从主题接收数据,将其缓冲到 PreparedStatement 中,当批处理 100K 记录时,它向 DB 发出 INSERT 查询。
直到数据仍然传入,这才有效。但是,例如,当缓冲 20K 记录并且没有更多记录传入时,它仍然等待更多 80K 记录,直到刷新语句。但如果一段时间后停滞不前,我想刷新那些 20K 。我怎样才能做到这一点?我看不出有什么方法可以抓住它。
例如,在使用基于 librdkafka 的 php-rdkafka 扩展的 PHP 中,我RD_KAFKA_RESP_ERR__PARTITION_EOF
在达到分区结束时得到,因此在发生这种情况时很容易挂钩缓冲区刷新。
我试图简化代码,所以只剩下重要的部分
public class TestConsumer {
private final Connection connection;
private final CountDownLatch shutdownLatch;
private final KafkaConsumer<String, Message> consumer;
private int processedCount = 0;
public TestConsumer(Connection connection) {
this.connection = connection;
this.consumer = new KafkaConsumer<>(getConfig(), new StringDeserializer(), new ProtoDeserializer<>(Message.parser()));
this.shutdownLatch = new CountDownLatch(1);
}
public void execute() {
PreparedStatement statement;
try {
statement = getPreparedStatement();
} catch (SQLException e) {
throw new RuntimeException(e);
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
commit(statement);
consumer.wakeup();
}));
consumer.subscribe(Collections.singletonList("source.topic"));
try {
while (true) {
ConsumerRecords<String, Message> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
records.forEach(record -> {
Message message = record.value();
try {
fillBatch(statement, message);
statement.addBatch();
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
processedCount += records.count();
if (processedCount > 100000) {
commit(statement);
}
}
} catch (WakeupException e) {
// ignore, we're closing
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
private void commit(PreparedStatement statement) {
try {
statement.executeBatch();
consumer.commitSync();
processedCount = 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
protected void fillBatch(PreparedStatement statement, Message message) throws SQLException {
try {
statement.setTimestamp(1, new Timestamp(message.getTime() * 1000L));
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
解决方案
我理解你的问题是这样的:
您想使用来自 Kafka 的消息
将它们堆积在内存中多达 10 万条记录
批量提交到数据库
但是您只想等待 t 秒(让我们说 10 秒)
这可以使用 Kafka 内置的消费者批处理以非常有效和可靠的方式实现。如果您可以以某种方式预测消息的平均大小(以字节为单位)。
在 Kafka 消费者配置中,您将设置以下内容:
fetch.min.bytes
=> 这应该是 100k x 消息的平均大小
fetch.max.wait.ms
=> 这是您的超时时间(以毫秒为单位)(例如 5000 等待 5 秒)
max.partition.fetch.bytes
=> 最大。每个分区的数据量。这有助于优化总提取大小
max.poll.records
=> 单个轮询中返回的最大记录数..可以设置为 100K
fetch.max.bytes
=> 如果要设置单个请求的上限
这样,如果它们符合定义的字节大小,您最多可以获得 100K 记录,但它将等待可配置的毫秒数。
民意调查返回记录后,您可以一次性保存并重复。
推荐阅读
- php - 如何在 Mac 上将 Apache 重置为其原始出厂状态
- javascript - 从使用 Web 浏览器设置的客户协议处理程序中删除
- azure - Azure 认知服务 API 提供与 Web 版本不同的情绪评分
- git - 如何比较两个不同 git 提交的数据输出
- python - 处理具有可变分隔符和行长的 pandas 数据帧
- python - 使用 Python3 在 Python2 中安装包
- architecture - 事件溯源中是否有接近智能合约的概念?
- c - c: typedef-d enum 可以检查实际值吗?
- struct - Ocaml 结构,就像我们在 C 中所做的那样
- docker - 如何解决“docker build”只需要 1 个参数。错误?