首页 > 解决方案 > Kafka RabbitMQ connector - can only get byte arrays

问题描述

I've set up a connector to pull from a RabbitMQ queue and push into a Kafka topic. The connector runs and the queue empties out. But when I look at the topic with either kafka-console-consumer or kafkacat, every entry looks like a byte arrray - [B@xxxxxxxx.

The RabbitMQ message payloads are all JSON. What do I need to do to get JSON back out from Kafka? I've tried value.converter=org.apache.kafka.connect.storage.StringConverter as well as using ByteArrayDeserializer with the console consumer.

connect-standalone.properties:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/robbie/kafka/plugins

RabbitMQSourceConnector.properties:

name=rabbitmq
tasks.max=1
connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
rabbitmq.prefetch.count=500
rabbitmq.automatic.recovery.enabled=false
rabbitmq.network.recovery.interval.ms=10000
rabbitmq.topology.recovery.enabled=true
rabbitmq.queue=test1
rabbitmq.username=testuser1
rabbitmq.password=xxxxxxxxxxxxxxx
rabbitmq.host=rmqhost
rabbitmq.port=5672
kafka.topic=rabbitmq.test1

标签: apache-kafkaapache-kafka-connectconfluent-platform

解决方案


You need to set

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

I wrote a blog up on this just today :)


推荐阅读