jdbc - Kafka JDBC Sink Connector,批量插入值
问题描述
我每秒收到很多消息(通过 http 协议)(50000 - 100000),并希望将它们保存到 PostgreSql。为此,我决定使用 Kafka JDBC Sink。
消息按一条记录保存到数据库,而不是批量保存。我想在 PostgreSQL 中批量插入 500-1000 条记录的记录。
我在这个问题上找到了一些答案:如何使用batch.size?
我尝试在配置中使用相关选项,但似乎它们没有任何效果。
我的 Kafka JDBC Sink PostgreSql 配置(etc/kafka-connect-jdbc/postgres.properties
):
name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3
# The topics to consume from - required for sink connectors like this one
topics=jsonb_pkgs
connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false
insert.mode=insert
connection.user=postgres
table.name.format=${topic}
connection.password=pwd
batch.size=500
# based on 500*3000byte message size
fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000
我还添加了以下选项connect-distributed.properties
:
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
虽然每个分区每秒获取超过 1000 条记录,但记录是按 1 保存到 PostgreSQL 中的。
编辑:消费者选项以正确名称添加到其他文件中
我还添加了以下选项etc/schema-registry/connect-avro-standalone.properties
:
# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000
解决方案
我意识到我误解了文档。记录被一一插入到数据库中。一个事务中插入的记录数取决于batch.size
和consumer.max.poll.records
。我希望批量插入是以另一种方式实现的。我想有一个选项来插入这样的记录:
INSERT INTO table1 (First, Last)
VALUES
('Fred', 'Smith'),
('John', 'Smith'),
('Michael', 'Smith'),
('Robert', 'Smith');
但这似乎是不可能的。
推荐阅读
- mysql - 如何将日期汇总到新列
- c# - Check the Settings.Secure.LOCATION_MODE in Xamarin Android
- python - 如何将随机数与列表中的数字匹配
- http - 在服务 http 请求时处理服务器关闭
- android - 接口作为 Kotlin 中的属性
- c# - 如何修复“在 C# 中用一些空间连续移动文本(标签)”?
- hbase - zookeeper 启动变慢,Hbase 启动失败
- ios - 如果设置了 contentInset,则在长按选择文本时 WKWebView 出现奇怪的滚动行为
- javascript - RXjs,链接长轮询请求
- php - laravel whereMonth 在集合中不起作用