apache-kafka - 错误运行消费者时出现未知错误:org.apache.kafka.common.errors.SerializationException:未知魔术字节
问题描述
我正在尝试将消息从 python 发送到 kafka 消费者。但是由于ERROR Unknown error when running consumer: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Python 正在从 twitter api 正确获取数据,但无法将消息发送给消费者,因此出现错误。任何建议都会有所帮助。
代码:
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from kafka import KafkaProducer
access_token = "xxxx"
access_token_secret = "xxxx"
consumer_key = "xxxx"
consumer_secret = "xxxx"
class StdOutListener(StreamListener):
def on_data(self, data):
producer.send("twitter", data.encode('utf-8'))
print (data)
return True
def on_error(self, status):
print (status)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)
stream.filter(track="Bitcoin")
消费者:
kafka-avro-console-consumer --bootstrap-server 127.0.0.1:9092 --topic twitter
我也尝试过提供属性,
afka-avro-console-consumer --bootstrap-server 127.0.0.1:9092 --topic twitter --property print.key=true --property print.value=true --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer --key-deserializer org.apache.kafka.common.serialization.StringDeserializer
错误:
[2020-08-14 07:23:52,305] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
解决方案
错误似乎认为它正在获取 Proto 消息
反序列化 Protobuf消息时出错
但是你正在使用kafka-avro-console-consumer
同时,您的代码正在发送 utf-8 编码的字符串,因此您不需要任何 Confluent 工具,只需kafka-console-consumer
推荐阅读
- laravel - 如何在 Laravel Backpack 中将数据保存在不同的数据库表中
- python - 当文件名不存在时跳过 For 循环中的迭代
- python - 如何让我的伺服电机角度增加 1 度?
- r - R中的e1071包中的峰度计算是否需要3个以上的值?
- c - 有 malloc(): 损坏的最大尺寸问题
- android - RxJava 作为事件总线被多次调用,即使只触发一次
- java - 如何最小化war文件?
- py4j - 给定一个 `py4j.java_gateway.JavaObject` 是否可以保存并加载它?使用 Pickle 不起作用
- sql - Redshift 从表 B 中填充表 A 中的缺失行
- google-kubernetes-engine - 在 GCP 中从 repo 创建容器时没有指定分支名称的选项