apache-kafka - 配置 Apache Kafka 接收器 jdbc 连接器
问题描述
我想将发送到主题的数据发送到 postgresql 数据库。所以我按照本指南配置了属性文件,如下所示:
name=transaction-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=transactions
connection.url=jdbc:postgresql://localhost:5432/db
connection.user=db-user
connection.password=
auto.create=true
insert.mode=insert
table.name.format=transaction
pk.mode=none
我开始连接器
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-postgresql.properties
接收器连接器已创建但由于此错误而未启动:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
该模式采用 avro 格式并已注册,我可以向主题发送(生成)消息并从中读取(使用)。但我似乎无法将其发送到数据库。
这是我的./etc/schema-registry/connect-avro-standalone.properties
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
这是使用 java-api 提供主题的生产者:
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
try (KafkaProducer<String, Transaction> producer = new KafkaProducer<>(properties)) {
Transaction transaction = new Transaction();
transaction.setFoo("foo");
transaction.setBar("bar");
UUID uuid = UUID.randomUUID();
final ProducerRecord<String, Transaction> record = new ProducerRecord<>(TOPIC, uuid.toString(), transaction);
producer.send(record);
}
我正在验证数据是否正确序列化和反序列化使用
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic transactions \
--from-beginning --max-messages 1
数据库已启动并正在运行。
解决方案
这是不正确的:
未知的魔术字节可能是由于 id 字段不是架构的一部分
该错误意味着有关该主题的消息未使用 Schema Registry Avro 序列化程序进行序列化。
您如何将数据放在该主题上?
也许所有的消息都有问题,也许只有一些——但默认情况下,这将停止 Kafka Connect 任务。
你可以设置
"errors.tolerance":"all",
让它忽略它无法反序列化的消息。但是,如果它们都没有正确地被 Avro 序列化,这将无济于事,您需要正确地序列化它们,或者选择不同的转换器(例如,如果它们实际上是 JSON,则使用 JSONConverter)。
这些参考资料应该对您有更多帮助:
- https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
- https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues
- http://rmoff.dev/ksldn19-kafka-connect
编辑 :
如果要序列化密钥,StringSerializer
则需要在 Connect 配置中使用它:
key.converter=org.apache.kafka.connect.storage.StringConverter
您可以在工作人员处设置它(全局属性,适用于您在其上运行的所有连接器),或者仅针对此连接器(即,将其放在连接器属性本身中,它将覆盖工作人员设置)
推荐阅读
- r - 合并具有重复标识符的行,同时添加其他列
- php - 用于显示数据库中的 Passers 的查询
- python - Python正则表达式括号匹配不返回正确的子字符串
- java - 使用java在邮件正文内容中获取问号符号
- javascript - 从 javascript 函数获取返回到 html 属性
- python - ValueError:输入在python中包含NaN
- android - React-Native 尝试运行 android,收到有关 /bin/sh 的错误:/usr/local/share/android-sdk/platform-tools/adb: No such file or directory
- r - 有没有办法从 JSON 列中有效地提取多个属性?
- javascript - javascript中的函数没有被调用SignalR
- python - 为 ML 选择最佳特征