首页 > 解决方案 > 使用 kafka 控制台消费者消费时显示无效字符

问题描述

在使用 Kafka 控制台使用者或 kt(用于 Kafka 的 GoLang CLI 工具)从 Kafka 主题中消费时,我收到了无效字符。

...
\u0000\ufffd?\u0006app\u0000\u0000\u0000\u0000\u0000\u0000\u003e@\u0001
\u0000\u000cSec-39\u001aSome Actual Value Text\ufffd\ufffd\ufffd\ufffd\ufffd
\ufffd\u0015@\ufffd\ufffd\ufffd\ufffd\ufffd\ufff
...

即使 Kafka 连接实际上可以将正确的数据发送到 SQL 数据库

标签: apache-kafkakafka-consumer-apiconfluent-platform

解决方案


鉴于你说

Kafka connect 实际上可以将正确的数据下沉到 SQL 数据库。

我的假设是您正在对有关该主题的数据使用 Avro 序列化。正确配置的 Kafka Connect 将获取 Avro 数据并对其进行反序列化。

但是,诸如kafka-console-consumer, kt, kafkacatet al 等控制台工具不支持 Avro,因此如果您使用它们从 Avro 编码的主题中读取数据,您会得到一堆奇怪的字符。

要将 Avro 数据读取到命令行,您可以使用kafka-avro-console-consumer

kafka-avro-console-consumer
         --bootstrap-server kafka:29092\
         --topic test_topic_avro \
         --property schema.registry.url=http://schema-registry:8081

编辑:也添加来自@CodeGeas 的建议

或者,可以通过以下方式使用REST 代理读取数据:

# Create a consumer for JSON data
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
         -H "Accept: application/vnd.kafka.v2+json" \
         --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \

# Subscribe the consumer to a topic
         http://kafka-rest-instance:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
         --data '{"topics":["YOUR-TOPIC-NAME"]}' \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

# Then consume some data from a topic using the base URL in the first response.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

稍后,删除消费者:

curl -X DELETE -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance

推荐阅读