首页 > 解决方案 > 配置 Apache Kafka 接收器 jdbc 连接器

问题描述

我想将发送到主题的数据发送到 postgresql 数据库。所以我按照本指南配置了属性文件,如下所示:

name=transaction-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=transactions
connection.url=jdbc:postgresql://localhost:5432/db
connection.user=db-user
connection.password=
auto.create=true
insert.mode=insert
table.name.format=transaction
pk.mode=none

我开始连接器

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-postgresql.properties

接收器连接器已创建但由于此错误而未启动:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

该模式采用 avro 格式并已注册,我可以向主题发送(生成)消息并从中读取(使用)。但我似乎无法将其发送到数据库。

这是我的./etc/schema-registry/connect-avro-standalone.properties

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

这是使用 java-api 提供主题的生产者:

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

try (KafkaProducer<String, Transaction> producer = new KafkaProducer<>(properties)) {
    Transaction transaction = new Transaction();
    transaction.setFoo("foo");
    transaction.setBar("bar");
    UUID uuid = UUID.randomUUID();
    final ProducerRecord<String, Transaction> record = new ProducerRecord<>(TOPIC, uuid.toString(), transaction);
    producer.send(record);
}

我正在验证数据是否正确序列化和反序列化使用

./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic transactions \
    --from-beginning --max-messages 1

数据库已启动并正在运行。

标签: apache-kafkaapache-kafka-connect

解决方案


这是不正确的:

未知的魔术字节可能是由于 id 字段不是架构的一部分

该错误意味着有关该主题的消息未使用 Schema Registry Avro 序列化程序进行序列化。

您如何将数据放在该主题上?

也许所有的消息都有问题,也许只有一些——但默认情况下,这将停止 Kafka Connect 任务。

你可以设置

"errors.tolerance":"all",

让它忽略它无法反序列化的消息。但是,如果它们都没有正确地被 Avro 序列化,这将无济于事,您需要正确地序列化它们,或者选择不同的转换器(例如,如果它们实际上是 JSON,则使用 JSONConverter)。

这些参考资料应该对您有更多帮助:


编辑 :

如果要序列化密钥,StringSerializer则需要在 Connect 配置中使用它:

key.converter=org.apache.kafka.connect.storage.StringConverter

您可以在工作人员处设置它(全局属性,适用于您在其上运行的所有连接器),或者仅针对此连接器(即,将其放在连接器属性本身中,它将覆盖工作人员设置)


推荐阅读