postgresql - PostgreSQL 和 Kafka Connect 集成问题
问题描述
我正在测试 JDBC Sink 连接器以将记录从 Kafka 转储到 PostgreSQL。这是连接器配置:
{
"name": "jdbc-sink-postgresql-1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "role",
"connection.url": "jdbc:postgresql://localhost:5432/postgres?user=&password=",
"auto.create": "false",
"insert.mode": "upsert",
"mode":"incrementing",
"table.name.format":"role",
"pk.mode":"record_value",
"pk.fields":"role_id"
}
}
当我运行连接器时,出现以下异常:
java.sql.BatchUpdateException: Batch entry 1 INSERT INTO "role" ("role_id","role_name") VALUES (123,'admin') ON CONFLICT ("role_id") DO UPDATE SET "role_name"=EXCLUDED."role_name" was aborted.
Call getNextException to see the cause.
at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2778))
关于我在这里缺少什么的任何指示?如果需要更多信息,请告诉我。
解决方案
所以,问题出在桌子上。这就是我最初创建表格的方式:
CREATE TABLE role(
role_id int PRIMARY KEY,
role_name VARCHAR (255) UNIQUE NOT NULL
);
主题中的测试数据如下所示:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic role --property schema.registry.url=http://localhost:8081/ --property value.schema='{"type":"record","name":"myRecord","fields": [{"name": "role_id","type": "int"},{"name": "role_name","type": "string"}]}' --key-serializer org.apache.kafka.common.serialization.StringSerializer --value-serializer io.confluent.kafka.serializers.KafkaAvroSerializer --property print.key=true
{"role_id":122, "role_name":"admin"}
{"role_id":123, "role_name":"admin"}
{"role_id":124, "role_name":"admin"}
{"role_id":125, "role_name":"admin"}
{"role_id":126, "role_name":"admin"}
因此,当我的测试数据一次又一次地为 role_name 字段具有相同的值时,它违反了唯一约束,因此出现了错误。
我做了什么?
我把桌子放下了。
创建了一个没有唯一键约束的新表,并将上述数据推送到 PostgreSQL 没有问题。
推荐阅读
- python - Matplotlib 迭代以结合图例句柄和标签
- reactjs - "Maximum update depth exceeded" in React functional component
- python - Find information for many people in wikidata
- elixir - Can you create conditional plugs in elixir?
- firebase - 无法使用 AT 命令向 Firebase 发送数据
- python - 无法在 execute_script() 之后解析 h1 元素
- javascript - EmberJS + ember-intl: automatically generate additional locales
- node.js - How to create new User with password on MongoDB and connect from NodeJS application
- python - 有没有办法打印(到控制台)并将输入作为美丽的汤页面扩展 -Python
- python - How do I write text lines to file with variables to a file?