apache-kafka - Kafka RabbitMQ connector - can only get byte arrays
问题描述
I've set up a connector to pull from a RabbitMQ queue and push into a Kafka topic. The connector runs and the queue empties out. But when I look at the topic with either kafka-console-consumer or kafkacat, every entry looks like a byte arrray - [B@xxxxxxxx
.
The RabbitMQ message payloads are all JSON. What do I need to do to get JSON back out from Kafka? I've tried value.converter=org.apache.kafka.connect.storage.StringConverter
as well as using ByteArrayDeserializer with the console consumer.
connect-standalone.properties:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/robbie/kafka/plugins
RabbitMQSourceConnector.properties:
name=rabbitmq
tasks.max=1
connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
rabbitmq.prefetch.count=500
rabbitmq.automatic.recovery.enabled=false
rabbitmq.network.recovery.interval.ms=10000
rabbitmq.topology.recovery.enabled=true
rabbitmq.queue=test1
rabbitmq.username=testuser1
rabbitmq.password=xxxxxxxxxxxxxxx
rabbitmq.host=rmqhost
rabbitmq.port=5672
kafka.topic=rabbitmq.test1
解决方案
You need to set
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
I wrote a blog up on this just today :)
推荐阅读
- python - 如何比较上下数组?
- unique - 如何在一列中找到三个唯一数字,停止,重新跟踪
- python - 在 sphinx 中创建类概述,如 epydoc
- c# - Entity Framework Core - 数据库操作预计会影响 1 行,但实际上会影响 0 行
- git - 暂存大块时在 git 中使用以前的正则表达式进行搜索
- swift - 使用类注入引用 Observable
- rdf - 学生可以是本科生、硕士生或博士生的简单 OWL 语句
- python - 每列中的唯一值与它的出现次数成一列,pandas
- ios - Xcode 9.4 致命错误:lipo:无法打开输入文件(没有这样的文件或目录)
- azure - 运行时 Azure 应用服务中缺少 System.Private.ServiceModel